Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRA: scheduler event handlers via assume cache #124595

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
Expand Down Expand Up @@ -288,6 +289,7 @@ func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
resourceClaimCache *assumecache.AssumeCache,
gvkMap map[framework.GVK]framework.ActionType,
) error {
var (
Expand Down Expand Up @@ -456,12 +458,11 @@ func addAllEventHandlers(
}
case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
// No need to wait for this cache to be
// populated. If a claim is not yet in the
// cache, scheduling will get retried once it
// is.
resourceClaimCache.AddEventHandler(buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"))
}
case framework.ResourceClass:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
Expand Down
52 changes: 51 additions & 1 deletion pkg/scheduler/eventhandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,6 +41,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
Expand All @@ -46,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

func TestNodeAllocatableChanged(t *testing.T) {
Expand Down Expand Up @@ -362,6 +367,7 @@ func TestAddAllEventHandlers(t *testing.T) {
tests := []struct {
name string
gvkMap map[framework.GVK]framework.ActionType
enableDRA bool
expectStaticInformers map[reflect.Type]bool
expectDynamicInformers map[schema.GroupVersionResource]bool
}{
Expand All @@ -375,6 +381,44 @@ func TestAddAllEventHandlers(t *testing.T) {
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events disabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "DRA events enabled",
gvkMap: map[framework.GVK]framework.ActionType{
framework.PodSchedulingContext: framework.Add,
framework.ResourceClaim: framework.Add,
framework.ResourceClass: framework.Add,
framework.ResourceClaimParameters: framework.Add,
framework.ResourceClassParameters: framework.Add,
},
enableDRA: true,
expectStaticInformers: map[reflect.Type]bool{
reflect.TypeOf(&v1.Pod{}): true,
reflect.TypeOf(&v1.Node{}): true,
reflect.TypeOf(&v1.Namespace{}): true,
reflect.TypeOf(&resourcev1alpha2.PodSchedulingContext{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaim{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClaimParameters{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClass{}): true,
reflect.TypeOf(&resourcev1alpha2.ResourceClassParameters{}): true,
},
expectDynamicInformers: map[schema.GroupVersionResource]bool{},
},
{
name: "add GVKs handlers defined in framework dynamically",
gvkMap: map[framework.GVK]framework.ActionType{
Expand Down Expand Up @@ -433,6 +477,7 @@ func TestAddAllEventHandlers(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DynamicResourceAllocation, tt.enableDRA)
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -447,8 +492,13 @@ func TestAddAllEventHandlers(t *testing.T) {

dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
var resourceClaimCache *assumecache.AssumeCache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we didn't test the DRA in the test because the feature gate is always closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I added new test cases which exercise the informer creation more completely.

The feature gate check in eventhandlers.go is a bit redundant: the plugin itself never asks for any of these events when it is disabled.

if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}

if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil {
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

// NodeScoreList declares a list of nodes and their scores.
Expand Down Expand Up @@ -701,6 +702,11 @@ type Handle interface {

SharedInformerFactory() informers.SharedInformerFactory

// ResourceClaimInfos returns an assume cache of ResourceClaim objects
// which gets populated by the shared informer factory and the dynamic resources
// plugin.
ResourceClaimCache() *assumecache.AssumeCache

// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &dynamicResources{}, nil
}

logger := klog.FromContext(ctx)
pl := &dynamicResources{
enabled: true,
fh: fh,
Expand All @@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
claimAssumeCache: fh.ResourceClaimCache(),
}

return pl, nil
Expand Down Expand Up @@ -597,21 +596,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
//
// There is a small race here:
// - The dynamicresources plugin allocates claim A and updates the assume cache.
// - A second pod gets marked as unschedulable based on that assume cache.
// - Before the informer cache here catches up, the pod runs, terminates and
// the claim gets deallocated without ever sending the claim status with
// allocation to the scheduler.
// - The comparison below is for a *very* old claim with no allocation and the
// new claim where the allocation is already removed again, so no
// RemovedClaimAllocation event gets emitted.
//
// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
// into the event mechanism. This can be tackled together with adding autoscaler
// support, which also needs to do something with the assume cache.
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.Queue, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -1319,10 +1320,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))

tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)

assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(assumeCache),
}
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil {
Expand Down
25 changes: 20 additions & 5 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice"
)

Expand Down Expand Up @@ -71,11 +72,12 @@ type frameworkImpl struct {
// pluginsMap contains all plugins, by name.
pluginsMap map[string]framework.Plugin

clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
logger klog.Logger
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
logger klog.Logger

metricsRecorder *metrics.MetricAsyncRecorder
profileName string
Expand Down Expand Up @@ -126,6 +128,7 @@ type frameworkOptions struct {
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
Expand Down Expand Up @@ -176,6 +179,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}

// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
return func(o *frameworkOptions) {
o.resourceClaimCache = resourceClaimCache
}
}

// WithSnapshotSharedLister sets the SharedLister of the snapshot.
func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
return func(o *frameworkOptions) {
Expand Down Expand Up @@ -259,6 +269,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
resourceClaimCache: options.resourceClaimCache,
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
Expand Down Expand Up @@ -1598,6 +1609,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
return f.informerFactory
}

func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
return f.resourceClaimCache
}

func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
pgSet := sets.Set[string]{}

Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

const (
Expand Down Expand Up @@ -293,11 +294,18 @@ func New(ctx context.Context,
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)

var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}

profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
Expand Down Expand Up @@ -356,7 +364,7 @@ func New(ctx context.Context,
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()

if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}

Expand Down