Skip to content

[WIP] refactor datastore struct #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func run() error {

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{MetricMapping: mapping}, *refreshMetricsInterval)
// Setup runner.
datastore := datastore.NewDatastore(ctx, pmf)
datastore := datastore.NewDatastore(pmf)

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/inferencemodel_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestInferenceModelReconciler(t *testing.T) {
WithIndex(&v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName).
Build()
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := datastore.NewDatastore(t.Context(), pmf)
ds := datastore.NewDatastore(pmf)
for _, m := range test.modelsInStore {
ds.ModelSetIfOlder(m)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestInferencePoolReconciler(t *testing.T) {
ctx := context.Background()

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := datastore.NewDatastore(ctx, pmf)
datastore := datastore.NewDatastore(pmf)
inferencePoolReconciler := &InferencePoolReconciler{PoolNamespacedName: namespacedName, Client: fakeClient, Datastore: datastore}

// Step 1: Inception, only ready pods matching pool1 are added to the store.
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/controller/pod_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, err
}

c.updateDatastore(logger, pod, pool)
c.updateDatastore(ctx, logger, pod, pool)
return ctrl.Result{}, nil
}

Expand All @@ -69,13 +69,13 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(c)
}

func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
func (c *PodReconciler) updateDatastore(ctx context.Context, logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
if !pod.DeletionTimestamp.IsZero() || !c.Datastore.PoolLabelsMatch(pod.Labels) || !podIsReady(pod) {
logger.V(logutil.DEBUG).Info("Pod removed or not added", "name", namespacedName)
c.Datastore.PodDelete(namespacedName)
} else {
if c.Datastore.PodUpdateOrAddIfNotExist(pod, pool) {
if c.Datastore.PodUpdateOrAddIfNotExist(ctx, pod, pool) {
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
} else {
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ func TestPodReconciler(t *testing.T) {
Build()

// Configure the initial state of the datastore.
store := datastore.NewDatastore(t.Context(), pmf)
store := datastore.NewDatastore(pmf)
store.PoolSet(test.pool)
for _, pod := range test.existingPods {
store.PodUpdateOrAddIfNotExist(pod, pool)
store.PodUpdateOrAddIfNotExist(t.Context(), pod, pool)
}

podReconciler := &PodReconciler{Client: fakeClient, Datastore: store}
Expand Down
13 changes: 5 additions & 8 deletions pkg/epp/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ type Datastore interface {
PodGetAll() []backendmetrics.PodMetrics
// PodList lists pods matching the given predicate.
PodList(func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
PodDelete(namespacedName types.NamespacedName)
PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool)

// Clears the store state, happens when the pool gets deleted.
Clear()
}

func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFactory) *datastore {
func NewDatastore(pmf *backendmetrics.PodMetricsFactory) *datastore {
store := &datastore{
parentCtx: parentCtx,
poolAndModelsMu: sync.RWMutex{},
models: make(map[string]*v1alpha2.InferenceModel),
pods: &sync.Map{},
Expand All @@ -80,8 +79,6 @@ func NewDatastore(parentCtx context.Context, pmf *backendmetrics.PodMetricsFacto
}

type datastore struct {
// parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore.
parentCtx context.Context
// poolAndModelsMu is used to synchronize access to pool and the models map.
poolAndModelsMu sync.RWMutex
pool *v1alpha2.InferencePool
Expand Down Expand Up @@ -228,15 +225,15 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b
return res
}

func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool {
func (ds *datastore) PodUpdateOrAddIfNotExist(ctx context.Context, pod *corev1.Pod, pool *v1alpha2.InferencePool) bool {
namespacedName := types.NamespacedName{
Name: pod.Name,
Namespace: pod.Namespace,
}
var pm backendmetrics.PodMetrics
existing, ok := ds.pods.Load(namespacedName)
if !ok {
pm = ds.pmf.NewPodMetrics(ds.parentCtx, pod, ds)
pm = ds.pmf.NewPodMetrics(ctx, pod, ds)
ds.pods.Store(namespacedName, pm)
} else {
pm = existing.(backendmetrics.PodMetrics)
Expand All @@ -262,7 +259,7 @@ func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client,
if podIsReady(&pod) {
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
activePods[pod.Name] = true
if ds.PodUpdateOrAddIfNotExist(&pod, pool) {
if ds.PodUpdateOrAddIfNotExist(ctx, &pod, pool) {
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
} else {
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestPool(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
datastore := NewDatastore(context.Background(), pmf)
datastore := NewDatastore(pmf)
datastore.PoolSet(tt.inferencePool)
gotPool, gotErr := datastore.PoolGet()
if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestModel(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
ds := NewDatastore(t.Context(), pmf)
ds := NewDatastore(pmf)
for _, m := range test.existingModels {
ds.ModelSetIfOlder(m)
}
Expand Down Expand Up @@ -318,10 +318,10 @@ func TestMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
ds := NewDatastore(ctx, pmf)
ds := NewDatastore(pmf)
ds.PoolSet(inferencePool)
for _, pod := range test.storePods {
ds.PodUpdateOrAddIfNotExist(pod, inferencePool)
ds.PodUpdateOrAddIfNotExist(ctx, pod, inferencePool)
}
assert.EventuallyWithT(t, func(t *assert.CollectT) {
got := ds.PodGetAll()
Expand Down
2 changes: 1 addition & 1 deletion test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ func BeforeSuite() func() {
pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond)
// Adjust from defaults
serverRunner.PoolName = "vllm-llama3-8b-instruct-pool"
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
serverRunner.Datastore = datastore.NewDatastore(pmf)
serverRunner.SecureServing = false

if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil {
Expand Down