diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 1c7a905281..018241ea8f 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -38,10 +38,10 @@ type FakePodMetrics struct { } func (fpm *FakePodMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics()) } -func (fpm *FakePodMetrics) GetPod() *backend.Pod { +func (fpm *FakePodMetrics) GetMetadata() *backend.Pod { return fpm.Pod } @@ -49,7 +49,7 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { return fpm.Metrics } -func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) { +func (fpm *FakePodMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { fpm.Pod = pod } func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index 4d22ef18ce..b99392af60 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -25,7 +25,6 @@ import ( "github.com/go-logr/logr" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -35,7 +34,7 @@ const ( ) type podMetrics struct { - pod atomic.Pointer[backend.Pod] + metadata atomic.Pointer[datalayer.EndpointMetadata] metrics atomic.Pointer[MetricsState] pmc PodMetricsClient ds datalayer.PoolInfo @@ -49,23 +48,23 @@ type podMetrics struct { } type PodMetricsClient interface { - FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState) (*MetricsState, error) + FetchMetrics(ctx context.Context, pod *datalayer.EndpointMetadata, existing *MetricsState) (*MetricsState, error) } func (pm *podMetrics) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics()) + return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics()) } -func (pm *podMetrics) GetPod() *backend.Pod { - return pm.pod.Load() +func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata { + return pm.metadata.Load() } func (pm *podMetrics) GetMetrics() *MetricsState { return pm.metrics.Load() } -func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { - pm.pod.Store(pod) +func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) { + pm.metadata.Store(pod) } // start starts a goroutine exactly once to periodically update metrics. The goroutine will be @@ -73,7 +72,7 @@ func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) { func (pm *podMetrics) startRefreshLoop(ctx context.Context) { pm.startOnce.Do(func() { go func() { - pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetMetadata()) ticker := time.NewTicker(pm.interval) defer ticker.Stop() for { @@ -84,7 +83,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { return case <-ticker.C: // refresh metrics periodically if err := pm.refreshMetrics(); err != nil { - pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod()) + pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetMetadata()) } } } @@ -95,7 +94,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) { func (pm *podMetrics) refreshMetrics() error { ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() - updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics()) + updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics()) if err != nil { pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err) } @@ -115,7 +114,7 @@ func (pm *podMetrics) refreshMetrics() error { } func (pm *podMetrics) stopRefreshLoop() { - pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod()) + pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetMetadata()) pm.stopOnce.Do(func() { close(pm.done) }) diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 8a5561c0ed..8d84701fc5 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -29,7 +29,7 @@ import ( ) var ( - pod1Info = &datalayer.PodInfo{ + pod1Info = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1-rank-0", Namespace: "default", diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 99f15a20f4..a884086c0c 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -52,7 +52,7 @@ type PodMetricsFactory struct { refreshMetricsInterval time.Duration } -func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics { +func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *datalayer.EndpointMetadata, ds datalayer.PoolInfo) datalayer.Endpoint { pm := &podMetrics{ pmc: f.pmc, ds: ds, @@ -60,9 +60,9 @@ func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalaye startOnce: sync.Once{}, stopOnce: sync.Once{}, done: make(chan struct{}), - logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName), + logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName), } - pm.pod.Store(pod) + pm.metadata.Store(metadata) pm.metrics.Store(NewMetricsState()) pm.startRefreshLoop(parentCtx) diff --git a/pkg/epp/backend/pod.go b/pkg/epp/backend/pod.go index 324a7479aa..e244940429 100644 --- a/pkg/epp/backend/pod.go +++ b/pkg/epp/backend/pod.go @@ -20,4 +20,4 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) -type Pod = datalayer.PodInfo +type Pod = datalayer.EndpointMetadata diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 0fdecb674b..5ad96c12f9 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -123,7 +123,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } endpointPool1 := pool.InferencePoolToEndpointPool(pool1) - if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -141,7 +141,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -157,7 +157,7 @@ func TestInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1) - if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -171,14 +171,14 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } type diffStoreParams struct { wantPool *datalayer.EndpointPool - wantPods []string + wantEndpoints []string wantObjectives []*v1alpha2.InferenceObjective } @@ -189,15 +189,15 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string { } // Default wantPods if not set because PodGetAll returns an empty slice when empty. - if params.wantPods == nil { - params.wantPods = []string{} + if params.wantEndpoints == nil { + params.wantEndpoints = []string{} } - gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + gotEndpoints := []string{} + for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) { + gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name) } - if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { - return "pods:" + diff + if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { + return "endpoints:" + diff } // Default wantModels if not set because ModelGetAll returns an empty slice when empty. @@ -343,8 +343,8 @@ func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string { params.wantPods = []string{} } gotPods := []string{} - for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) { - gotPods = append(gotPods, pm.GetPod().NamespacedName.Name) + for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) { + gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name) } if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" { return "pods:" + diff diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index efdb36b25f..39515ed735 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -213,8 +213,8 @@ func TestPodReconciler(t *testing.T) { } var gotPods []*corev1.Pod - for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + for _, em := range store.PodList(backendmetrics.AllPodsPredicate) { + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: em.GetMetadata().PodName, Namespace: em.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: em.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { diff --git a/pkg/epp/datalayer/collector.go b/pkg/epp/datalayer/collector.go index 86a8f7b4e4..f7be8125b4 100644 --- a/pkg/epp/datalayer/collector.go +++ b/pkg/epp/datalayer/collector.go @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc started := false c.startOnce.Do(func() { - logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress()) + logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress()) c.ctx, c.cancel = context.WithCancel(ctx) started = true ready = make(chan struct{}) diff --git a/pkg/epp/datalayer/collector_test.go b/pkg/epp/datalayer/collector_test.go index f0655a7c72..a9a609351c 100644 --- a/pkg/epp/datalayer/collector_test.go +++ b/pkg/epp/datalayer/collector_test.go @@ -44,14 +44,14 @@ func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error { } func defaultEndpoint() Endpoint { - pod := &PodInfo{ + meta := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod-name", Namespace: "default", }, Address: "1.2.3.4:5678", } - ms := NewEndpoint(pod, nil) + ms := NewEndpoint(meta, nil) return ms } diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 2d262eb3ac..1ba7f939a9 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -21,10 +21,10 @@ import ( "sync/atomic" ) -// EndpointPodState allows management of the Pod related attributes. -type EndpointPodState interface { - GetPod() *PodInfo - UpdatePod(*PodInfo) +// EndpointMetaState allows management of the EndpointMetadata related attributes. +type EndpointMetaState interface { + GetMetadata() *EndpointMetadata + UpdateMetadata(*EndpointMetadata) GetAttributes() *Attributes } @@ -37,22 +37,22 @@ type EndpointMetricsState interface { // Endpoint represents an inference serving endpoint and its related attributes. type Endpoint interface { fmt.Stringer - EndpointPodState + EndpointMetaState EndpointMetricsState AttributeMap } // ModelServer is an implementation of the Endpoint interface. type ModelServer struct { - pod atomic.Pointer[PodInfo] + pod atomic.Pointer[EndpointMetadata] metrics atomic.Pointer[Metrics] attributes *Attributes } -// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics. -func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { - if pod == nil { - pod = &PodInfo{} +// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics. +func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer { + if meta == nil { + meta = &EndpointMetadata{} } if metrics == nil { metrics = NewMetrics() @@ -60,7 +60,7 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { ep := &ModelServer{ attributes: NewAttributes(), } - ep.UpdatePod(pod) + ep.UpdateMetadata(meta) ep.UpdateMetrics(metrics) return ep } @@ -68,14 +68,14 @@ func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer { // String returns a representation of the ModelServer. For brevity, only names of // extended attributes are returned and not their values. func (srv *ModelServer) String() string { - return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys()) + return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys()) } -func (srv *ModelServer) GetPod() *PodInfo { +func (srv *ModelServer) GetMetadata() *EndpointMetadata { return srv.pod.Load() } -func (srv *ModelServer) UpdatePod(pod *PodInfo) { +func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) { srv.pod.Store(pod) } diff --git a/pkg/epp/datalayer/podinfo.go b/pkg/epp/datalayer/endpoint_metadata.go similarity index 63% rename from pkg/epp/datalayer/podinfo.go rename to pkg/epp/datalayer/endpoint_metadata.go index 7cbd6d8863..70c78e0dfd 100644 --- a/pkg/epp/datalayer/podinfo.go +++ b/pkg/epp/datalayer/endpoint_metadata.go @@ -30,8 +30,8 @@ type Addressable interface { GetNamespacedName() types.NamespacedName } -// PodInfo represents the relevant Kubernetes Pod state of an inference server. -type PodInfo struct { +// EndpointMetadata represents the relevant Kubernetes Pod state of an inference server. +type EndpointMetadata struct { NamespacedName types.NamespacedName PodName string Address string @@ -40,16 +40,16 @@ type PodInfo struct { Labels map[string]string } -// String returns a string representation of the pod. -func (p *PodInfo) String() string { - if p == nil { +// String returns a string representation of the endpoint. +func (e *EndpointMetadata) String() string { + if e == nil { return "" } - return fmt.Sprintf("%+v", *p) + return fmt.Sprintf("%+v", *e) } // Clone returns a full copy of the object. -func (p *PodInfo) Clone() *PodInfo { +func (p *EndpointMetadata) Clone() *EndpointMetadata { if p == nil { return nil } @@ -58,7 +58,7 @@ func (p *PodInfo) Clone() *PodInfo { for key, value := range p.Labels { clonedLabels[key] = value } - return &PodInfo{ + return &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: p.NamespacedName.Name, Namespace: p.NamespacedName.Namespace, @@ -71,22 +71,22 @@ func (p *PodInfo) Clone() *PodInfo { } } -// GetNamespacedName gets the namespace name of the Pod. -func (p *PodInfo) GetNamespacedName() types.NamespacedName { - return p.NamespacedName +// GetNamespacedName gets the namespace name of the Endpoint. +func (e *EndpointMetadata) GetNamespacedName() types.NamespacedName { + return e.NamespacedName } -// GetIPAddress returns the Pod's IP address. -func (p *PodInfo) GetIPAddress() string { - return p.Address +// GetIPAddress returns the Endpoint's IP address. +func (e *EndpointMetadata) GetIPAddress() string { + return e.Address } -// GetPort returns the Pod's inference port. -func (p *PodInfo) GetPort() string { - return p.Port +// GetPort returns the Endpoint's inference port. +func (e *EndpointMetadata) GetPort() string { + return e.Port } -// GetMetricsHost returns the pod's metrics host (ip:port) -func (p *PodInfo) GetMetricsHost() string { - return p.MetricsHost +// GetMetricsHost returns the Endpoint's metrics host (ip:port) +func (e *EndpointMetadata) GetMetricsHost() string { + return e.MetricsHost } diff --git a/pkg/epp/datalayer/podinfo_test.go b/pkg/epp/datalayer/endpoint_metadata_test.go similarity index 90% rename from pkg/epp/datalayer/podinfo_test.go rename to pkg/epp/datalayer/endpoint_metadata_test.go index baf804a22f..fb9bc7a93b 100644 --- a/pkg/epp/datalayer/podinfo_test.go +++ b/pkg/epp/datalayer/endpoint_metadata_test.go @@ -48,14 +48,14 @@ var ( PodIP: podip, }, } - expected = &PodInfo{ + expected = &EndpointMetadata{ NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, Address: podip, Labels: labels, } ) -func TestPodInfoClone(t *testing.T) { +func TestEndpointMetadataClone(t *testing.T) { clone := expected.Clone() assert.NotSame(t, expected, clone) if diff := cmp.Diff(expected, clone); diff != "" { @@ -66,8 +66,8 @@ func TestPodInfoClone(t *testing.T) { assert.Equal(t, "prod", expected.Labels["env"], "mutating clone should not affect original") } -func TestPodInfoString(t *testing.T) { - podinfo := PodInfo{ +func TestEndpointMetadataString(t *testing.T) { + endpointMetadata := EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, @@ -79,7 +79,7 @@ func TestPodInfoString(t *testing.T) { Labels: labels, } - s := podinfo.String() + s := endpointMetadata.String() assert.Contains(t, s, name) assert.Contains(t, s, namespace) assert.Contains(t, s, podip) diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 3a81763d50..78765095cf 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -46,7 +46,7 @@ type PoolInfo interface { // providing methods to allocate and retire endpoints. This can potentially be used for // pooled memory or other management chores in the implementation. type EndpointFactory interface { - NewEndpoint(parent context.Context, inpod *PodInfo, poolinfo PoolInfo) Endpoint + NewEndpoint(parent context.Context, inEnpointMetadata *EndpointMetadata, poolinfo PoolInfo) Endpoint ReleaseEndpoint(ep Endpoint) } @@ -71,8 +71,8 @@ func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Durati // NewEndpoint implements EndpointFactory.NewEndpoint. // Creates a new endpoint and starts its associated collector with its own ticker. // Guards against multiple concurrent calls for the same endpoint. -func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, _ PoolInfo) Endpoint { - key := types.NamespacedName{Namespace: inpod.GetNamespacedName().Namespace, Name: inpod.GetNamespacedName().Name} +func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inEndpointMetadata *EndpointMetadata, _ PoolInfo) Endpoint { + key := types.NamespacedName{Namespace: inEndpointMetadata.GetNamespacedName().Namespace, Name: inEndpointMetadata.GetNamespacedName().Name} logger := log.FromContext(parent).WithValues("pod", key) if _, ok := lc.collectors.Load(key); ok { @@ -80,7 +80,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, return nil } - endpoint := NewEndpoint(inpod, nil) + endpoint := NewEndpoint(inEndpointMetadata, nil) collector := NewCollector() // TODO or full backward compatibility, set the logger and poolinfo if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded { @@ -102,7 +102,7 @@ func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *PodInfo, // ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint // Stops the collector and cleans up resources for the endpoint func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) { - key := ep.GetPod().GetNamespacedName() + key := ep.GetMetadata().GetNamespacedName() if value, ok := lc.collectors.LoadAndDelete(key); ok { collector := value.(*Collector) diff --git a/pkg/epp/datalayer/factory_test.go b/pkg/epp/datalayer/factory_test.go index 35cce888da..66b724e54b 100644 --- a/pkg/epp/datalayer/factory_test.go +++ b/pkg/epp/datalayer/factory_test.go @@ -31,7 +31,7 @@ func TestFactory(t *testing.T) { source := &DummySource{} factory := NewEndpointFactory([]DataSource{source}, 100*time.Millisecond) - pod1 := &PodInfo{ + pod1 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", @@ -44,7 +44,7 @@ func TestFactory(t *testing.T) { dup := factory.NewEndpoint(context.Background(), pod1, nil) assert.Nil(t, dup, "expected to fail to create a duplicate collector") - pod2 := &PodInfo{ + pod2 := &EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/datasource.go b/pkg/epp/datalayer/metrics/datasource.go index 81723d4e0d..6b64d10325 100644 --- a/pkg/epp/datalayer/metrics/datasource.go +++ b/pkg/epp/datalayer/metrics/datasource.go @@ -98,8 +98,8 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error { // Collect is triggered by the data layer framework to fetch potentially new // MSP metrics data for an endpoint. func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error { - target := dataSrc.getMetricsEndpoint(ep.GetPod()) - families, err := dataSrc.client.Get(ctx, target, ep.GetPod()) + target := dataSrc.getMetricsEndpoint(ep.GetMetadata()) + families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata()) if err != nil { return err diff --git a/pkg/epp/datalayer/metrics/datasource_test.go b/pkg/epp/datalayer/metrics/datasource_test.go index 7c293753f5..d61380318d 100644 --- a/pkg/epp/datalayer/metrics/datasource_test.go +++ b/pkg/epp/datalayer/metrics/datasource_test.go @@ -50,7 +50,7 @@ func TestDatasource(t *testing.T) { ctx := context.Background() factory := datalayer.NewEndpointFactory([]datalayer.DataSource{source}, 100*time.Millisecond) - pod := &datalayer.PodInfo{ + pod := &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index 27b1e07cd1..b2f2b68942 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -138,7 +138,7 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } - logger := log.FromContext(ctx).WithValues("pod", ep.GetPod().NamespacedName) + logger := log.FromContext(ctx).WithValues("pod", ep.GetMetadata().NamespacedName) if updated { clone.UpdateTime = time.Now() logger.V(logutil.TRACE).Info("Refreshed metrics", "updated", clone) diff --git a/pkg/epp/datalayer/metrics/logger_test.go b/pkg/epp/datalayer/metrics/logger_test.go index 4bf68cf0aa..b060a5e2f6 100644 --- a/pkg/epp/datalayer/metrics/logger_test.go +++ b/pkg/epp/datalayer/metrics/logger_test.go @@ -79,14 +79,14 @@ func TestLogger(t *testing.T) { assert.Contains(t, logOutput, "\"Stale metrics\": \"[]\"") } -var pod1 = &datalayer.PodInfo{ +var pod1 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod1", Namespace: "default", }, Address: "1.2.3.4:5678", } -var pod2 = &datalayer.PodInfo{ +var pod2 = &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: "pod2", Namespace: "default", diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 2ab2e98cb0..89aa33e61d 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -207,13 +207,13 @@ func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective { // /// Pods/endpoints APIs /// // TODO: add a flag for callers to specify the staleness threshold for metrics. // ref: https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/1046#discussion_r2246351694 -func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { - res := []backendmetrics.PodMetrics{} +func (ds *datastore) PodList(predicate func(datalayer.Endpoint) bool) []datalayer.Endpoint { + res := []datalayer.Endpoint{} ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if predicate(pm) { - res = append(res, pm) + ep := v.(datalayer.Endpoint) + if predicate(ep) { + res = append(res, ep) } return true }) @@ -235,14 +235,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { if len(ds.pool.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } - pods := []*datalayer.PodInfo{} + pods := []*datalayer.EndpointMetadata{} for idx, port := range ds.pool.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { metricsPort = port } pods = append(pods, - &datalayer.PodInfo{ + &datalayer.EndpointMetadata{ NamespacedName: types.NamespacedName{ Name: pod.Name + "-rank-" + strconv.Itoa(idx), Namespace: pod.Namespace, @@ -256,28 +256,28 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } result := true - for _, podInfo := range pods { - var pm backendmetrics.PodMetrics - existing, ok := ds.pods.Load(podInfo.NamespacedName) + for _, endpointMetadata := range pods { + var ep datalayer.Endpoint + existing, ok := ds.pods.Load(endpointMetadata.NamespacedName) if !ok { - pm = ds.epf.NewEndpoint(ds.parentCtx, podInfo, ds) - ds.pods.Store(podInfo.NamespacedName, pm) + ep = ds.epf.NewEndpoint(ds.parentCtx, endpointMetadata, ds) + ds.pods.Store(endpointMetadata.NamespacedName, ep) result = false } else { - pm = existing.(backendmetrics.PodMetrics) + ep = existing.(backendmetrics.PodMetrics) } - // Update pod properties if anything changed. - pm.UpdatePod(podInfo) + // Update endpoint properties if anything changed. + ep.UpdateMetadata(endpointMetadata) } return result } func (ds *datastore) PodDelete(podName string) { ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if pm.GetPod().PodName == podName { + ep := v.(datalayer.Endpoint) + if ep.GetMetadata().PodName == podName { ds.pods.Delete(k) - ds.epf.ReleaseEndpoint(pm) + ds.epf.ReleaseEndpoint(ep) } return true }) @@ -309,10 +309,10 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err // Remove pods that don't belong to the pool or not ready any more. ds.pods.Range(func(k, v any) bool { - pm := v.(backendmetrics.PodMetrics) - if exist := activePods[pm.GetPod().PodName]; !exist { - logger.V(logutil.VERBOSE).Info("Removing pod", "pod", pm.GetPod()) - ds.PodDelete(pm.GetPod().PodName) + ep := v.(datalayer.Endpoint) + if exist := activePods[ep.GetMetadata().PodName]; !exist { + logger.V(logutil.VERBOSE).Info("Removing pod", "pod", ep.GetMetadata()) + ds.PodDelete(ep.GetMetadata().PodName) } return true }) diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 73beb1f24e..2a33bbccf5 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -408,7 +408,7 @@ func TestPods(t *testing.T) { test.op(ctx, ds) var gotPods []*corev1.Pod for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) { - pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}} + pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetMetadata().PodName, Namespace: pm.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetMetadata().GetIPAddress()}} gotPods = append(gotPods, pod) } if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) { @@ -418,18 +418,18 @@ func TestPods(t *testing.T) { } } -func TestPodInfo(t *testing.T) { +func TestEndpointMetadata(t *testing.T) { tests := []struct { - name string - op func(ctx context.Context, ds Datastore) - pool *v1.InferencePool - existingPods []*corev1.Pod - wantPodInfos []*datalayer.PodInfo + name string + op func(ctx context.Context, ds Datastore) + pool *v1.InferencePool + existingPods []*corev1.Pod + wantEndpointMetas []*datalayer.EndpointMetadata }{ { name: "Add new pod, no existing pods, should add", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -451,7 +451,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, no existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -485,7 +485,7 @@ func TestPodInfo(t *testing.T) { { name: "Add new pod, with existing pods, should add, multiple target ports", existingPods: []*corev1.Pod{pod1}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -543,7 +543,7 @@ func TestPodInfo(t *testing.T) { { name: "Delete the pod, multiple target ports", existingPods: []*corev1.Pod{pod1, pod2}, - wantPodInfos: []*datalayer.PodInfo{ + wantEndpointMetas: []*datalayer.EndpointMetadata{ { NamespacedName: types.NamespacedName{ Name: pod1.Name + "-rank-0", @@ -590,11 +590,11 @@ func TestPodInfo(t *testing.T) { } test.op(ctx, ds) - var gotPodInfos []*datalayer.PodInfo + var gotEndpointMetas []*datalayer.EndpointMetadata for _, pm := range ds.PodList(backendmetrics.AllPodsPredicate) { - gotPodInfos = append(gotPodInfos, pm.GetPod()) + gotEndpointMetas = append(gotEndpointMetas, pm.GetMetadata()) } - if diff := cmp.Diff(test.wantPodInfos, gotPodInfos, cmpopts.SortSlices(func(a, b *datalayer.PodInfo) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { + if diff := cmp.Diff(test.wantEndpointMetas, gotEndpointMetas, cmpopts.SortSlices(func(a, b *datalayer.EndpointMetadata) bool { return a.NamespacedName.Name < b.NamespacedName.Name })); diff != "" { t.Errorf("ConvertTo() mismatch (-want +got):\n%s", diff) } }) diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index ec3def1646..e20a37d733 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -74,7 +74,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), pool.Name, - pod.GetPod().NamespacedName.Name, + pod.GetMetadata().NamespacedName.Name, ) } } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index c4f4f1c1b9..5fdcecc552 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -233,7 +233,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet podTotalCount := 0 podFilteredList := d.datastore.PodList(func(pm backendmetrics.PodMetrics) bool { podTotalCount++ - if _, found := endpoints[pm.GetPod().GetIPAddress()]; found { + if _, found := endpoints[pm.GetMetadata().GetIPAddress()]; found { return true } return false @@ -277,9 +277,9 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch pm := make([]schedulingtypes.Pod, len(pods)) for i, pod := range pods { if pod.GetAttributes() != nil { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} } else { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetMetadata().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} } } @@ -336,7 +336,7 @@ func (d *Director) GetRandomPod() *backend.Pod { } number := rand.Intn(len(pods)) pod := pods[number] - return pod.GetPod() + return pod.GetMetadata() } func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index f361303c83..ddfb3ada6e 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -699,7 +699,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) diff := cmp.Diff(test.output, got, cmpopts.SortSlices(func(a, b backendmetrics.PodMetrics) bool { - return a.GetPod().NamespacedName.String() < b.GetPod().NamespacedName.String() + return a.GetMetadata().NamespacedName.String() < b.GetMetadata().NamespacedName.String() })) if diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) diff --git a/pkg/epp/saturationdetector/saturationdetector.go b/pkg/epp/saturationdetector/saturationdetector.go index 46b94b22cd..0891207e9e 100644 --- a/pkg/epp/saturationdetector/saturationdetector.go +++ b/pkg/epp/saturationdetector/saturationdetector.go @@ -91,8 +91,8 @@ func (d *Detector) IsSaturated(ctx context.Context, candidatePods []backendmetri for _, podMetric := range candidatePods { metrics := podMetric.GetMetrics() podNn := "unknown-pod" - if podMetric.GetPod() != nil { - podNn = podMetric.GetPod().NamespacedName.String() + if podMetric.GetMetadata() != nil { + podNn = podMetric.GetMetadata().NamespacedName.String() } if metrics == nil { diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 2947785d22..e343820a2e 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -28,7 +28,7 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" - backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" @@ -307,14 +307,14 @@ func (m *Plugin) CleanUpInactivePods(ctx context.Context, handle plugins.Handle) case <-ctx.Done(): return case <-ticker.C: - activePodMetrics := handle.PodList(func(_ backendmetrics.PodMetrics) bool { return true }) - activePods := make(map[ServerID]struct{}, len(activePodMetrics)) - for _, pm := range activePodMetrics { - activePods[ServerID(pm.GetPod().NamespacedName)] = struct{}{} + activeEndpoints := handle.PodList(func(_ datalayer.Endpoint) bool { return true }) + activeEndpointNames := make(map[ServerID]struct{}, len(activeEndpoints)) + for _, ep := range activeEndpoints { + activeEndpointNames[ServerID(ep.GetMetadata().NamespacedName)] = struct{}{} } for _, pod := range m.indexer.Pods() { - if _, ok := activePods[pod]; !ok { + if _, ok := activeEndpointNames[pod]; !ok { m.indexer.RemovePod(pod) logger.Info("Removed pod not in active set", "pod", pod) }