diff --git a/pkg/epp/datalayer/attributemap_test.go b/pkg/epp/datalayer/attributemap_test.go index a6697bd21..c4ad5697f 100644 --- a/pkg/epp/datalayer/attributemap_test.go +++ b/pkg/epp/datalayer/attributemap_test.go @@ -43,6 +43,9 @@ func TestExpectPutThenGetToMatch(t *testing.T) { dv, ok := got.(*dummy) assert.True(t, ok, "expected value to be of type *dummy") assert.Equal(t, "foo", dv.Text) + + _, ok = attrs.Get("b") + assert.False(t, ok, "expected key not to exist") } func TestExpectKeysToMatchAdded(t *testing.T) { diff --git a/pkg/epp/datalayer/datasource_test.go b/pkg/epp/datalayer/datasource_test.go index 3ed47cf44..e5cb41dfd 100644 --- a/pkg/epp/datalayer/datasource_test.go +++ b/pkg/epp/datalayer/datasource_test.go @@ -44,6 +44,9 @@ func TestRegisterAndGetSource(t *testing.T) { err = reg.Register(ds) assert.Error(t, err, "expected error on duplicate registration") + err = reg.Register(nil) + assert.Error(t, err, "expected error on nil") + // Get by name got, found := reg.GetNamedSource("test") assert.True(t, found, "expected to find registered data source") @@ -53,6 +56,20 @@ func TestRegisterAndGetSource(t *testing.T) { all := reg.GetSources() assert.Len(t, all, 1) assert.Equal(t, "test", all[0].Name()) + + // Default registry + err = RegisterSource(ds) + assert.NoError(t, err, "expected no error on registration") + + // Get by name + got, found = GetNamedSource[*mockDataSource]("test") + assert.True(t, found, "expected to find registered data source") + assert.Equal(t, "test", got.Name()) + + // Get all sources + all = GetSources() + assert.Len(t, all, 1) + assert.Equal(t, "test", all[0].Name()) } func TestGetNamedSourceWhenNotFound(t *testing.T) { diff --git a/pkg/epp/datalayer/factory_test.go b/pkg/epp/datalayer/factory_test.go new file mode 100644 index 000000000..35cce888d --- /dev/null +++ b/pkg/epp/datalayer/factory_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datalayer + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" +) + +func TestFactory(t *testing.T) { + source := &DummySource{} + factory := NewEndpointFactory([]DataSource{source}, 100*time.Millisecond) + + pod1 := &PodInfo{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Address: "1.2.3.4:5678", + } + endpoint1 := factory.NewEndpoint(context.Background(), pod1, nil) + assert.NotNil(t, endpoint1, "failed to create endpoint") + + dup := factory.NewEndpoint(context.Background(), pod1, nil) + assert.Nil(t, dup, "expected to fail to create a duplicate collector") + + pod2 := &PodInfo{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Address: "1.2.3.4:5679", + } + endpoint2 := factory.NewEndpoint(context.Background(), pod2, nil) + assert.NotNil(t, endpoint2, "failed to create endpoint") + + factory.ReleaseEndpoint(endpoint1) + + // use Eventually for async processing + require.Eventually(t, func() bool { + return atomic.LoadInt64(&source.callCount) == 2 + }, 290*time.Millisecond, 2*time.Millisecond, "expected 2 collections") + + factory.Shutdown() +} diff --git a/pkg/epp/datalayer/metrics/datasource_test.go b/pkg/epp/datalayer/metrics/datasource_test.go new file mode 100644 index 000000000..7c293753f --- /dev/null +++ b/pkg/epp/datalayer/metrics/datasource_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" +) + +func TestDatasource(t *testing.T) { + source := NewDataSource("https", "/metrics", true, nil) + extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "") + assert.Nil(t, err, "failed to create extractor") + + name := source.Name() + assert.Equal(t, DataSourceName, name) + + err = source.AddExtractor(extractor) + assert.Nil(t, err, "failed to add extractor") + + err = source.AddExtractor(extractor) + assert.NotNil(t, err, "expected to fail to add the same extractor twice") + + extractors := source.Extractors() + assert.Len(t, extractors, 1) + assert.Equal(t, extractor.Name(), extractors[0]) + + err = datalayer.RegisterSource(source) + assert.Nil(t, err, "failed to register") + + ctx := context.Background() + factory := datalayer.NewEndpointFactory([]datalayer.DataSource{source}, 100*time.Millisecond) + pod := &datalayer.PodInfo{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Address: "1.2.3.4:5678", + } + endpoint := factory.NewEndpoint(ctx, pod, nil) + assert.NotNil(t, endpoint, "failed to create endpoint") + + err = source.Collect(ctx, endpoint) + assert.NotNil(t, err, "expected to fail to collect metrics") +} diff --git a/pkg/epp/datalayer/metrics/extractor_test.go b/pkg/epp/datalayer/metrics/extractor_test.go index f3acd758b..d8aaca556 100644 --- a/pkg/epp/datalayer/metrics/extractor_test.go +++ b/pkg/epp/datalayer/metrics/extractor_test.go @@ -22,6 +22,7 @@ import ( "github.com/google/go-cmp/cmp" dto "github.com/prometheus/client_model/go" + "google.golang.org/protobuf/proto" "k8s.io/utils/ptr" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" @@ -29,17 +30,33 @@ import ( const ( // use hardcoded values - importing causes cycle - defaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" + defaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" + defaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" + defaultLoraInfoMetric = "vllm:lora_requests_info" + defaultCacheInfoMetric = "vllm:cache_config_info" ) func TestExtractorExtract(t *testing.T) { ctx := context.Background() - extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, "", "", "") + if _, err := NewExtractor("vllm: dummy", "", "", ""); err == nil { + t.Error("expected to fail to create extractor with invalid specification") + } + + extractor, err := NewExtractor(defaultTotalQueuedRequestsMetric, + defaultKvCacheUsagePercentageMetric, defaultLoraInfoMetric, defaultCacheInfoMetric) if err != nil { t.Fatalf("failed to create extractor: %v", err) } + if name := extractor.Name(); name == "" { + t.Error("empty extractor name") + } + + if inputType := extractor.ExpectedInputType(); inputType != PrometheusMetricType { + t.Errorf("incorrect expected input type: %v", inputType) + } + ep := datalayer.NewEndpoint(nil, nil) if ep == nil { t.Fatal("expected non-nil endpoint") @@ -78,6 +95,68 @@ func TestExtractorExtract(t *testing.T) { wantErr: true, // missing metrics can return an error updated: true, // but should still update }, + { + name: "multiple valid metrics", + data: PrometheusMetricMap{ + defaultTotalQueuedRequestsMetric: &dto.MetricFamily{ + Type: dto.MetricType_GAUGE.Enum(), + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{Value: ptr.To(5.0)}, + }, + }, + }, + defaultKvCacheUsagePercentageMetric: &dto.MetricFamily{ + Type: dto.MetricType_GAUGE.Enum(), + Metric: []*dto.Metric{ + { + Gauge: &dto.Gauge{Value: ptr.To(0.5)}, + }, + }, + }, + defaultLoraInfoMetric: &dto.MetricFamily{ + Type: dto.MetricType_GAUGE.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: proto.String(LoraInfoRunningAdaptersMetricName), + Value: proto.String("lora1"), + }, + { + Name: proto.String(LoraInfoWaitingAdaptersMetricName), + Value: proto.String("lora2"), + }, + { + Name: proto.String(LoraInfoMaxAdaptersMetricName), + Value: proto.String("1"), + }, + }, + }, + }, + }, + defaultCacheInfoMetric: &dto.MetricFamily{ + Type: dto.MetricType_GAUGE.Enum(), + Metric: []*dto.Metric{ + { + Label: []*dto.LabelPair{ + { + Name: proto.String(CacheConfigBlockSizeInfoMetricName), + Value: proto.String("16"), + }, + { + Name: proto.String(CacheConfigNumGPUBlocksMetricName), + Value: proto.String("1024"), + }, + }, + Gauge: &dto.Gauge{Value: ptr.To(1.0)}, + }, + }, + }, + }, + wantErr: false, + updated: true, + }, } for _, tt := range tests { diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go new file mode 100644 index 000000000..3ba2e7e84 --- /dev/null +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" +) + +// Buffer to write the logs to +type buffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *buffer) Write(p []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *buffer) read() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} + +func TestLogger(t *testing.T) { + // Redirect the logger to a buffer + var b buffer + opts := &zap.Options{ + DestWriter: &b, + Development: true, + Level: zapcore.Level(-5), + } + logger := zap.New(zap.UseFlagOptions(opts)) + ctrl.SetLogger(logger) + ctx, cancel := context.WithCancel(context.Background()) + ctx = logr.NewContext(ctx, logger) + + StartMetricsLogger(ctx, &fakeDataStore{}, 100*time.Millisecond, 100*time.Millisecond) + + time.Sleep(6 * time.Second) + cancel() + + logOutput := b.read() + assert.Contains(t, logOutput, "Refreshing Prometheus Metrics {\"ReadyPods\": 2}") + assert.Contains(t, logOutput, "Current Pods and metrics gathered {\"Fresh metrics\": \"[Pod: {NamespacedName:default/pod1 PodName: Address:1.2.3.4:5678") + assert.Contains(t, logOutput, "Metrics: {ActiveModels:map[modelA:1] WaitingModels:map[modelB:2] MaxActiveModels:5") + assert.Contains(t, logOutput, "RunningQueueSize:3 WaitingQueueSize:7 KVCacheUsagePercent:42.5 KvCacheMaxTokenCapacity:2048") + assert.Contains(t, logOutput, "Pod: {NamespacedName:default/pod2 PodName: Address:1.2.3.4:5679") + assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") +} + +var pod1 = &datalayer.PodInfo{ + NamespacedName: types.NamespacedName{ + Name: "pod1", + Namespace: "default", + }, + Address: "1.2.3.4:5678", +} +var pod2 = &datalayer.PodInfo{ + NamespacedName: types.NamespacedName{ + Name: "pod2", + Namespace: "default", + }, + Address: "1.2.3.4:5679", +} + +type fakeDataStore struct{} + +func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) { + return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPorts: []v1.Port{{Number: 8000}}}}, nil +} + +func (f *fakeDataStore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint { + var m = &datalayer.Metrics{ + ActiveModels: map[string]int{"modelA": 1}, + WaitingModels: map[string]int{"modelB": 2}, + MaxActiveModels: 5, + RunningQueueSize: 3, + WaitingQueueSize: 7, + KVCacheUsagePercent: 42.5, + KvCacheMaxTokenCapacity: 2048, + UpdateTime: time.Now(), + } + ep1 := datalayer.NewEndpoint(pod1, m) + ep2 := datalayer.NewEndpoint(pod2, m) + pods := []datalayer.Endpoint{ep1, ep2} + res := []datalayer.Endpoint{} + + for _, pod := range pods { + if predicate(pod) { + res = append(res, pod) + } + } + return res +}