Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 60 additions & 27 deletions pkg/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
informerv1 "k8s.io/client-go/informers/core/v1"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -88,8 +86,8 @@ type PodGroupManager struct {
permittedPG *gocache.Cache
// backedOffPG stores the podgorup name which failed scheudling recently.
backedOffPG *gocache.Cache
// podLister is pod lister
podLister listerv1.PodLister
// podInformer is pod informer
podInformer cache.SharedIndexInformer
// assignedPodsByPG stores the pods assumed or bound for podgroups
assignedPodsByPG map[string]sets.Set[string]
sync.RWMutex
Expand Down Expand Up @@ -124,7 +122,7 @@ func NewPodGroupManager(client client.Client, snapshotSharedLister framework.Sha
client: client,
snapshotSharedLister: snapshotSharedLister,
scheduleTimeout: scheduleTimeout,
podLister: podInformer.Lister(),
podInformer: podInformer.Informer(),
permittedPG: gocache.New(3*time.Second, 3*time.Second),
backedOffPG: gocache.New(10*time.Second, 10*time.Second),
assignedPodsByPG: map[string]sets.Set[string]{},
Expand Down Expand Up @@ -175,8 +173,8 @@ func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Durati
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.Pod, state *framework.CycleState) {
lh := klog.FromContext(ctx)
pgName := util.GetPodGroupLabel(pod)
if pgName == "" {
pgFullName := util.GetPodGroupFullName(pod)
if pgFullName == "" {
return
}

Expand All @@ -186,29 +184,27 @@ func (pgMgr *PodGroupManager) ActivateSiblings(ctx context.Context, pod *corev1.
} else if s, ok := c.(*PermitState); !ok || !s.Activate {
return
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}),
)
indexer := pgMgr.podInformer.GetIndexer()
groupPods, err := podsBelongToGroup(indexer, pgFullName)
if err != nil {
lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
lh.Error(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgFullName)
return
}

for i := range pods {
if pods[i].UID == pod.UID {
pods = append(pods[:i], pods[i+1:]...)
break
siblings := make([]*corev1.Pod, 0, len(groupPods))
for i := range groupPods {
tmpPod := groupPods[i]
if tmpPod.UID != pod.UID {
siblings = append(siblings, tmpPod)
}
}

if len(pods) != 0 {
if len(siblings) != 0 {
if c, err := state.Read(framework.PodsToActivateKey); err == nil {
if s, ok := c.(*framework.PodsToActivate); ok {
s.Lock()
for _, pod := range pods {
namespacedName := GetNamespacedName(pod)
s.Map[namespacedName] = pod
for _, sibling := range siblings {
namespacedName := GetNamespacedName(sibling)
s.Map[namespacedName] = sibling
}
s.Unlock()
}
Expand All @@ -231,17 +227,15 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
if _, exist := pgMgr.backedOffPG.Get(pgFullName); exist {
return fmt.Errorf("podGroup %v failed recently", pgFullName)
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}),
)
indexer := pgMgr.podInformer.GetIndexer()
siblings, err := podsBelongToGroup(indexer, pgFullName)
if err != nil {
return fmt.Errorf("podLister list pods failed: %w", err)
}

if len(pods) < int(pg.Spec.MinMember) {
if len(siblings) < int(pg.Spec.MinMember) {
return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
"current pods number: %v, minMember of group: %v", pod.Name, len(siblings), pg.Spec.MinMember)
}

if pg.Spec.MinResources == nil {
Expand Down Expand Up @@ -272,6 +266,45 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) er
return nil
}

// podsBelongToGroup returns pods indexed by the PodGroup's full name. It falls back to a
// linear scan when the expected label index is not registered on the informer.
func podsBelongToGroup(indexer cache.Indexer, pgFullName string) ([]*corev1.Pod, error) {
if _, hasIndex := indexer.GetIndexers()[util.LabelIndexerName]; hasIndex {
podsObj, err := indexer.ByIndex(util.LabelIndexerName, pgFullName)
if err != nil {
return nil, err
}
return castPods(podsObj), nil
}

return filterPods(indexer.List(), pgFullName), nil
}

func castPods(objs []interface{}) []*corev1.Pod {
pods := make([]*corev1.Pod, 0, len(objs))
for i := range objs {
pod, ok := objs[i].(*corev1.Pod)
if ok {
pods = append(pods, pod)
}
}
return pods
}

func filterPods(objs []interface{}, pgFullName string) []*corev1.Pod {
pods := make([]*corev1.Pod, 0, len(objs))
for i := range objs {
pod, ok := objs[i].(*corev1.Pod)
if !ok {
continue
}
if util.GetPodGroupFullName(pod) == pgFullName {
pods = append(pods, pod)
}
}
return pods
}

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
Expand Down
2 changes: 1 addition & 1 deletion pkg/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestPreFilter(t *testing.T) {
pgMgr := &PodGroupManager{
client: client,
snapshotSharedLister: tu.NewFakeSharedLister(tt.pendingPods, nodes),
podLister: podInformer.Lister(),
podInformer: podInformer.Informer(),
scheduleTimeout: &scheduleTimeout,
permittedPG: newCache(),
backedOffPG: newCache(),
Expand Down
5 changes: 4 additions & 1 deletion pkg/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func New(ctx context.Context, obj runtime.Object, handle framework.Handle) (fram
}

// Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning.
handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
handle.SharedInformerFactory().Core().V1().Pods().Informer().AddIndexers(cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
util.LabelIndexerName: util.NewIndexByLabelAndNamespace(v1alpha1.PodGroupLabel),
})

scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second
pgMgr := core.NewPodGroupManager(
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/client_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ package util
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
LabelIndexerName = "label-indexer"
)

// NewClientWithCachedReader returns a controller runtime Client with cache-baked client.
func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme *runtime.Scheme) (client.Client, cache.Cache, error) {
ccache, err := cache.New(config, cache.Options{
Expand All @@ -30,3 +35,19 @@ func NewClientWithCachedReader(ctx context.Context, config *rest.Config, scheme
})
return c, ccache, err
}

// NewIndexByLabelAndNamespace returns an indexer function for indexing pods by label and namespace.
func NewIndexByLabelAndNamespace(label string) func(obj interface{}) ([]string, error) {
return func(obj interface{}) ([]string, error) {
labels := obj.(metav1.Object).GetLabels()
if labels == nil {
return nil, nil
}
if labels[label] == "" {
return nil, nil
}
namespace := obj.(metav1.Object).GetNamespace()

return []string{namespace + "/" + labels[label]}, nil
}
}