Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scheduler/cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner"
schedulerServer "github.com/seldonio/seldon-core/scheduler/v2/pkg/server"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store"

Check failure on line 41 in scheduler/cmd/scheduler/main.go

View workflow job for this annotation

GitHub Actions / lint

could not import github.com/seldonio/seldon-core/scheduler/v2/pkg/store (-: # github.com/seldonio/seldon-core/scheduler/v2/pkg/store

Check failure on line 41 in scheduler/cmd/scheduler/main.go

View workflow job for this annotation

GitHub Actions / lint

could not import github.com/seldonio/seldon-core/scheduler/v2/pkg/store (-: # github.com/seldonio/seldon-core/scheduler/v2/pkg/store
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/pipeline"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/synchroniser"
Expand Down Expand Up @@ -262,7 +262,7 @@
}

// Create stores
ss := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
ss := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
ps := pipeline.NewPipelineStore(logger, eventHub, ss)
es := experiment.NewExperimentServer(logger, eventHub, ss, ps)
cleaner := cleaner.NewVersionCleaner(ss, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func benchmarkModelUpdate(
eventHub, err := coordinator.NewEventHub(logger)
require.NoError(b, err)

memoryStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
memoryStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
pipelineStore := pipeline.NewPipelineStore(logger, eventHub, memoryStore)
ip, err := NewIncrementalProcessor(
"some node",
Expand Down
8 changes: 4 additions & 4 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestRollingUpdate(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil)
modelStore := store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil)
xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}, nil)
g.Expect(err).To(BeNil())
inc := &IncrementalProcessor{
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestDraining(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil)
modelStore := store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil)
xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}, nil)
g.Expect(err).To(BeNil())
inc := &IncrementalProcessor{
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestModelSync(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
modelStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil)
modelStore := store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil)
xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}, nil)
g.Expect(err).To(BeNil())
inc := &IncrementalProcessor{
Expand Down Expand Up @@ -808,7 +808,7 @@ func TestEnvoySettings(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
logger := log.New()
eventHub, _ := coordinator.NewEventHub(logger)
memoryStore := store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), eventHub)
memoryStore := store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), eventHub)
xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{Host: "pipeline", GrpcPort: 1, HttpPort: 2}, nil)
g.Expect(err).To(BeNil())
inc := &IncrementalProcessor{
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/processor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestFetch(t *testing.T) {

logger := log.New()

memoryStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), nil)
memoryStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), nil)
pipelineHandler := pipeline.NewPipelineStore(logger, nil, memoryStore)

xdsCache, err := xdscache.NewSeldonXDSCache(log.New(), &xdscache.PipelineGatewayDetails{}, nil)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/kafka/dataflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func createTestScheduler(t *testing.T, serverName string) (*ChainerServer, *coor

eventHub, _ := coordinator.NewEventHub(logger)

schedulerStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
schedulerStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
pipelineServer := pipeline.NewPipelineStore(logger, eventHub, schedulerStore)

data :=
Expand Down
4 changes: 2 additions & 2 deletions scheduler/pkg/server/control_plane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestStartServerStream(t *testing.T) {
{
name: "ok",
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 10 * time.Millisecond,
},
},
{
name: "timeout",
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 1 * time.Millisecond,
},
Expand Down
12 changes: 6 additions & 6 deletions scheduler/pkg/server/server_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestModelsStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 10 * time.Millisecond,
},
Expand All @@ -157,7 +157,7 @@ func TestModelsStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 1 * time.Millisecond,
},
Expand Down Expand Up @@ -763,7 +763,7 @@ func TestServersStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 10 * time.Millisecond,
},
Expand Down Expand Up @@ -799,7 +799,7 @@ func TestServersStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 10 * time.Millisecond,
},
Expand Down Expand Up @@ -836,7 +836,7 @@ func TestServersStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 10 * time.Millisecond,
},
Expand All @@ -851,7 +851,7 @@ func TestServersStatusStream(t *testing.T) {
},
},
server: &SchedulerServer{
modelStore: store.NewMemoryStore(log.New(), store.NewLocalSchedulerStore(), nil),
modelStore: store.NewModelServerService(log.New(), store.NewLocalSchedulerStore(), nil),
logger: log.New(),
timeout: 1 * time.Millisecond,
},
Expand Down
6 changes: 3 additions & 3 deletions scheduler/pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLoadModel(t *testing.T) {
eventHub, err := coordinator.NewEventHub(logger)
g.Expect(err).To(BeNil())

schedulerStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
schedulerStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
experimentServer := experiment.NewExperimentServer(logger, eventHub, nil, nil)
pipelineServer := pipeline.NewPipelineStore(logger, eventHub, schedulerStore)
sync := synchroniser.NewSimpleSynchroniser(time.Duration(10 * time.Millisecond))
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestUnloadModel(t *testing.T) {
log.SetLevel(log.DebugLevel)
eventHub, err := coordinator.NewEventHub(logger)
g.Expect(err).To(BeNil())
schedulerStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
schedulerStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
experimentServer := experiment.NewExperimentServer(logger, eventHub, nil, nil)
pipelineServer := pipeline.NewPipelineStore(logger, eventHub, schedulerStore)
mockAgent := &mockAgentHandler{}
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestServerNotify(t *testing.T) {
log.SetLevel(log.DebugLevel)
eventHub, err := coordinator.NewEventHub(logger)
g.Expect(err).To(BeNil())
schedulerStore := store.NewMemoryStore(logger, store.NewLocalSchedulerStore(), eventHub)
schedulerStore := store.NewModelServerService(logger, store.NewLocalSchedulerStore(), eventHub)
sync := synchroniser.NewSimpleSynchroniser(time.Duration(10 * time.Millisecond))
scheduler := scheduler2.NewSimpleScheduler(logger,
schedulerStore,
Expand Down
Loading
Loading