Skip to content

Commit

Permalink
register Node/UpdateNodeTaint event to plugins which has Node/Add onl…
Browse files Browse the repository at this point in the history
…y, doesn't have Node/UpdateNodeTaint
  • Loading branch information
sanposhiho committed Mar 10, 2024
1 parent 95875b7 commit a0389b3
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 92 deletions.
Expand Up @@ -304,7 +304,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint
{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
// A resource might depend on node labels for topology filtering.
// A new or updated node may make pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// A pod might be waiting for a class to get created or modified.
{Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/plugins/interpodaffinity/plugin.go
Expand Up @@ -65,7 +65,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint
// - Add. An unschedulable Pod may fail due to violating pod-affinity constraints,
// adding an assigned Pod may make it schedulable.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
}
}

Expand Down
Expand Up @@ -148,7 +148,7 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
// Node add|delete|update maybe lead an topology key changed,
// and make these pod in scheduling schedulable or unschedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
}
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
// Pods may fail to find available PVs because the node labels do not
// match the storage class's allowed topologies or PV's node affinity.
// A new or updated node may make pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// We rely on CSI node to translate in-tree PV to CSI.
{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}},
// When CSIStorageCapacity is enabled, pods may become schedulable
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/plugins/volumezone/volume_zone.go
Expand Up @@ -280,7 +280,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}},
// A new node or updating a node's volume zone labels may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
// A new pvc may make a pod schedulable.
// Due to fields are immutable except `spec.resources`, pvc update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
Expand Down
11 changes: 10 additions & 1 deletion pkg/scheduler/framework/types.go
Expand Up @@ -72,7 +72,16 @@ const (
// - a Pod that is deleted
// - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle.
// - an existing Pod that was unscheduled but gets scheduled to a Node.
Pod GVK = "Pod"
Pod GVK = "Pod"
// A note about NodeAdd event and UpdateNodeTaint event:
// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
// It's definitely not something expected for plugin developers,
// and registering UpdateNodeTaint event is the only mitigation for now.
// So, kube-scheduler registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuck in the
// unschedulable pod pool.
// This behavior will be removed when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
Node GVK = "Node"
PersistentVolume GVK = "PersistentVolume"
PersistentVolumeClaim GVK = "PersistentVolumeClaim"
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -118,6 +118,7 @@ type SchedulingQueue interface {
AssignedPodAdded(logger klog.Logger, pod *v1.Pod)
AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod)
PendingPods() ([]*v1.Pod, string)
PodsInActiveQ() []*v1.Pod
// Close closes the SchedulingQueue so that the goroutine which is
// waiting to pop items can exit gracefully.
Close()
Expand Down Expand Up @@ -1227,6 +1228,18 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog
return podsToMove
}

// PodsInActiveQ returns all the Pods in the activeQ.
// This function is only used in tests.
func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod {
p.lock.RLock()
defer p.lock.RUnlock()
var result []*v1.Pod
for _, pInfo := range p.activeQ.List() {
result = append(result, pInfo.(*framework.QueuedPodInfo).Pod)
}
return result
}

var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v"

// PendingPods returns all the pending pods in the queue; accompanied by a debugging string
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/schedule_one.go
Expand Up @@ -62,8 +62,8 @@ const (
numberOfHighestScoredNodesToReport = 3
)

// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// ScheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) ScheduleOne(ctx context.Context) {
logger := klog.FromContext(ctx)
podInfo, err := sched.NextPod(logger)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/schedule_one_test.go
Expand Up @@ -811,7 +811,7 @@ func TestSchedulerScheduleOne(t *testing.T) {
if err != nil {
t.Fatal(err)
}
sched.scheduleOne(ctx)
sched.ScheduleOne(ctx)
<-called
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
t.Errorf("assumed pod: wanted %v, got %v", e, a)
Expand Down Expand Up @@ -884,7 +884,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
// We use conflicted pod ports to incur fit predicate failure if first pod not removed.
secondPod := podWithPort("bar", "", 8080)
queuedPodStore.Add(secondPod)
scheduler.scheduleOne(ctx)
scheduler.ScheduleOne(ctx)
select {
case b := <-bindingChan:
expectBinding := &v1.Binding{
Expand Down Expand Up @@ -921,7 +921,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
// queuedPodStore: [bar:8080]
// cache: [(assumed)foo:8080]

scheduler.scheduleOne(ctx)
scheduler.ScheduleOne(ctx)
select {
case err := <-errChan:
expectErr := &framework.FitError{
Expand Down Expand Up @@ -954,7 +954,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
}

queuedPodStore.Add(secondPod)
scheduler.scheduleOne(ctx)
scheduler.ScheduleOne(ctx)
select {
case b := <-bindingChan:
expectBinding := &v1.Binding{
Expand Down Expand Up @@ -1030,7 +1030,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
scheduler, _, errChan := setupTestScheduler(ctx, t, queuedPodStore, scache, informerFactory, nil, fns...)

queuedPodStore.Add(podWithTooBigResourceRequests)
scheduler.scheduleOne(ctx)
scheduler.ScheduleOne(ctx)
select {
case err := <-errChan:
expectErr := &framework.FitError{
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
if err != nil {
t.Fatal(err)
}
s.scheduleOne(ctx)
s.ScheduleOne(ctx)
// Wait for pod to succeed or fail scheduling
select {
case <-eventChan:
Expand Down Expand Up @@ -3481,7 +3481,7 @@ func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queue
// queuedPodStore: [foo:8080]
// cache: []

scheduler.scheduleOne(ctx)
scheduler.ScheduleOne(ctx)
// queuedPodStore: []
// cache: [(assumed)foo:8080]

Expand Down
32 changes: 31 additions & 1 deletion pkg/scheduler/scheduler.go
Expand Up @@ -389,17 +389,47 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei
// cannot be moved by any regular cluster event.
// So, we can just ignore such EventsToRegister here.

registerNodeAdded := false
registerNodeTaintUpdated := false
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
fn = defaultQueueingHintFn
}

if event.Event.Resource == framework.Node {
if event.Event.ActionType&framework.Add != 0 {
registerNodeAdded = true
}
if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
registerNodeTaintUpdated = true
}
}

queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: fn,
})
}
if registerNodeAdded && !registerNodeTaintUpdated {
// Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437
// NodeAdded QueueingHint isn't always called because of preCheck.
// It's definitely not something expected for plugin developers,
// and registering UpdateNodeTaint event is the only mitigation for now.
//
// So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the
// unschedulable pod pool.
// This behavior will be removed when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
&internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: defaultQueueingHintFn,
},
)
}
}
return queueingHintMap
}
Expand All @@ -415,7 +445,7 @@ func (sched *Scheduler) Run(ctx context.Context) {
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)

<-ctx.Done()
sched.SchedulingQueue.Close()
Expand Down
13 changes: 11 additions & 2 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -655,6 +655,9 @@ func Test_buildQueueingHintMap(t *testing.T) {
{Resource: framework.Node, ActionType: framework.Add}: {
{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
},
{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
},
},
},
{
Expand All @@ -668,6 +671,9 @@ func Test_buildQueueingHintMap(t *testing.T) {
{Resource: framework.Node, ActionType: framework.Add}: {
{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate.
},
{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
},
},
},
{
Expand All @@ -685,6 +691,9 @@ func Test_buildQueueingHintMap(t *testing.T) {
{Resource: framework.Node, ActionType: framework.Add}: {
{PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
},
{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
{PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
},
},
},
}
Expand Down Expand Up @@ -791,7 +800,7 @@ func Test_UnionedGVKs(t *testing.T) {
Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins
},
want: map[framework.GVK]framework.ActionType{
framework.Node: framework.Add,
framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
},
},
{
Expand Down Expand Up @@ -821,7 +830,7 @@ func Test_UnionedGVKs(t *testing.T) {
},
want: map[framework.GVK]framework.ActionType{
framework.Pod: framework.Add,
framework.Node: framework.Add,
framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered.
},
},
{
Expand Down

0 comments on commit a0389b3

Please sign in to comment.