Skip to content

Commit

Permalink
scheduler: central ResourceClaim assume cache
Browse files Browse the repository at this point in the history
This enables connecting the event handler for ResourceClaim to the assume
cache, which addresses a theoretic race condition.

It may also be useful for implementing the autoscaler support, because now
the autoscaler can modify the content of the cache.
  • Loading branch information
pohly committed Apr 29, 2024
1 parent 75dd31d commit 2d66ba2
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 31 deletions.
13 changes: 7 additions & 6 deletions pkg/scheduler/eventhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
Expand Down Expand Up @@ -288,6 +289,7 @@ func addAllEventHandlers(
sched *Scheduler,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
resourceClaimCache *assumecache.AssumeCache,
gvkMap map[framework.GVK]framework.ActionType,
) error {
var (
Expand Down Expand Up @@ -456,12 +458,11 @@ func addAllEventHandlers(
}
case framework.ResourceClaim:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if handlerRegistration, err = informerFactory.Resource().V1alpha2().ResourceClaims().Informer().AddEventHandler(
buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"),
); err != nil {
return err
}
handlers = append(handlers, handlerRegistration)
// No need to wait for this cache to be
// populated. If a claim is not yet in the
// cache, scheduling will get retried once it
// is.
resourceClaimCache.AddEventHandler(buildEvtResHandler(at, framework.ResourceClaim, "ResourceClaim"))
}
case framework.ResourceClass:
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/eventhandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2/ktesting"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,6 +39,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
Expand All @@ -46,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

func TestNodeAllocatableChanged(t *testing.T) {
Expand Down Expand Up @@ -447,8 +450,13 @@ func TestAddAllEventHandlers(t *testing.T) {

dynclient := dyfake.NewSimpleDynamicClient(scheme)
dynInformerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynclient, 0)
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}

if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, tt.gvkMap); err != nil {
if err := addAllEventHandlers(&testSched, informerFactory, dynInformerFactory, resourceClaimCache, tt.gvkMap); err != nil {
t.Fatalf("Add event handlers failed, error = %v", err)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

// NodeScoreList declares a list of nodes and their scores.
Expand Down Expand Up @@ -701,6 +702,10 @@ type Handle interface {

SharedInformerFactory() informers.SharedInformerFactory

// ResourceClaimInfos returns an assume cache of ResourceClaim objects
// which gets populated by the shared informer factory.
ResourceClaimCache() *assumecache.AssumeCache

// RunFilterPluginsWithNominatedPods runs the set of configured filter plugins for nominated pod on the given node.
RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
return &dynamicResources{}, nil
}

logger := klog.FromContext(ctx)
pl := &dynamicResources{
enabled: true,
fh: fh,
Expand All @@ -355,7 +354,7 @@ func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts fe
classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
claimAssumeCache: assumecache.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
claimAssumeCache: fh.ResourceClaimCache(),
}

return pl, nil
Expand Down Expand Up @@ -597,21 +596,6 @@ func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, po
//
// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
//
// There is a small race here:
// - The dynamicresources plugin allocates claim A and updates the assume cache.
// - A second pod gets marked as unschedulable based on that assume cache.
// - Before the informer cache here catches up, the pod runs, terminates and
// the claim gets deallocated without ever sending the claim status with
// allocation to the scheduler.
// - The comparison below is for a *very* old claim with no allocation and the
// new claim where the allocation is already removed again, so no
// RemovedClaimAllocation event gets emitted.
//
// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
// into the event mechanism. This can be tackled together with adding autoscaler
// support, which also needs to do something with the assume cache.
logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
return framework.Queue, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
st "k8s.io/kubernetes/pkg/scheduler/testing"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -1319,10 +1320,11 @@ func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceCl
tc.client.PrependReactor("list", "resourceclassparameters", createListReactor(tc.client.Tracker(), "ResourceClassParameters"))

tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)

assumeCache := assumecache.NewAssumeCache(tCtx.Logger(), tc.informerFactory.Resource().V1alpha2().ResourceClaims().Informer(), "resource claim", "", nil)
opts := []runtime.Option{
runtime.WithClientSet(tc.client),
runtime.WithInformerFactory(tc.informerFactory),
runtime.WithResourceClaimCache(assumeCache),
}
fh, err := runtime.NewFramework(tCtx, nil, nil, opts...)
if err != nil {
Expand Down
25 changes: 20 additions & 5 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
"k8s.io/kubernetes/pkg/util/slice"
)

Expand Down Expand Up @@ -71,11 +72,12 @@ type frameworkImpl struct {
// pluginsMap contains all plugins, by name.
pluginsMap map[string]framework.Plugin

clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
logger klog.Logger
clientSet clientset.Interface
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
logger klog.Logger

metricsRecorder *metrics.MetricAsyncRecorder
profileName string
Expand Down Expand Up @@ -126,6 +128,7 @@ type frameworkOptions struct {
kubeConfig *restclient.Config
eventRecorder events.EventRecorder
informerFactory informers.SharedInformerFactory
resourceClaimCache *assumecache.AssumeCache
snapshotSharedLister framework.SharedLister
metricsRecorder *metrics.MetricAsyncRecorder
podNominator framework.PodNominator
Expand Down Expand Up @@ -176,6 +179,13 @@ func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option
}
}

// WithResourceClaimCache sets the resource claim cache for the scheduling frameworkImpl.
func WithResourceClaimCache(resourceClaimCache *assumecache.AssumeCache) Option {
return func(o *frameworkOptions) {
o.resourceClaimCache = resourceClaimCache
}
}

// WithSnapshotSharedLister sets the SharedLister of the snapshot.
func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
return func(o *frameworkOptions) {
Expand Down Expand Up @@ -259,6 +269,7 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler
kubeConfig: options.kubeConfig,
eventRecorder: options.eventRecorder,
informerFactory: options.informerFactory,
resourceClaimCache: options.resourceClaimCache,
metricsRecorder: options.metricsRecorder,
extenders: options.extenders,
PodNominator: options.podNominator,
Expand Down Expand Up @@ -1598,6 +1609,10 @@ func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory
return f.informerFactory
}

func (f *frameworkImpl) ResourceClaimCache() *assumecache.AssumeCache {
return f.resourceClaimCache
}

func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
pgSet := sets.Set[string]{}

Expand Down
7 changes: 6 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)

const (
Expand Down Expand Up @@ -293,11 +294,15 @@ func New(ctx context.Context,
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)

resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer()
resourceClaimCache := assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)

profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
Expand Down Expand Up @@ -356,7 +361,7 @@ func New(ctx context.Context,
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()

if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}

Expand Down

0 comments on commit 2d66ba2

Please sign in to comment.