Skip to content

Commit 404a257

Browse files
committed
Implement error propagation for prepare data plugins
1 parent c8b6ba5 commit 404a257

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -366,27 +366,42 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
366366
// Execute the DAG
367367

368368
// Channels to signal plugin execution completion.
369-
pluginExecuted := map[string]chan struct{}{}
369+
pluginExecuted := make(map[string]chan error)
370370
nameToNode := map[string]PrepareDataPlugin{}
371371
for _, plugin := range plugins {
372-
pluginExecuted[plugin.TypedName().String()] = make(chan struct{})
372+
pluginExecuted[plugin.TypedName().String()] = make(chan error)
373373
nameToNode[plugin.TypedName().String()] = plugin
374374
}
375375

376376
for pluginName, dependents := range dag {
377377
// Execute plugins based on dependencies.
378378
// Wait for the dependencies to complete before executing a plugin.
379-
go func() {
379+
go func() error {
380380
for _, dep := range dependents {
381-
<-pluginExecuted[dep]
381+
err, open := <-pluginExecuted[dep]
382+
if !open {
383+
continue
384+
}
385+
if err != nil {
386+
// If a dependency failed, propagate the error and do not execute this plugin.
387+
pluginExecuted[pluginName] <- fmt.Errorf("dependency plugin %s failed: %w", dep, err)
388+
return err
389+
}
382390
}
383-
nameToNode[pluginName].PrepareRequestData(ctx, request, pods)
384391
// Signal that the plugin has been executed.
385-
close(pluginExecuted[pluginName])
392+
defer close(pluginExecuted[pluginName])
393+
394+
return nameToNode[pluginName].PrepareRequestData(ctx, request, pods)
386395
}()
387396
}
388397
for pluginName := range dag {
389-
<-pluginExecuted[pluginName]
398+
err, open := <-pluginExecuted[pluginName]
399+
if !open {
400+
continue
401+
}
402+
if err != nil {
403+
return err
404+
}
390405
}
391406
return nil
392407
}

pkg/epp/requestcontrol/graph_util_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ func (m *mockPrepareDataPlugin) Consumes() map[string]any {
4444
return m.consumes
4545
}
4646

47-
func (m *mockPrepareDataPlugin) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) {
47+
func (m *mockPrepareDataPlugin) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error {
4848
pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42})
49+
return nil
4950
}
5051

5152
func TestPrepareDataGraph(t *testing.T) {

pkg/epp/requestcontrol/plugins.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type ResponseComplete interface {
6363
type PrepareDataPlugin interface {
6464
plugins.ProducerPlugin
6565
plugins.ConsumerPlugin
66-
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod)
66+
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error
6767
}
6868

6969
// AdmissionPlugin is called by the director after the prepare data phase and before scheduling.

0 commit comments

Comments
 (0)