diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 6373ee166401e..ef2d43dfeb146 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -304,7 +304,7 @@ func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange}, // A resource might depend on node labels for topology filtering. // A new or updated node may make pods schedulable. - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, // A pod might be waiting for a class to get created or modified. {Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}}, } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go index 6f6c3c8feb6dc..3978aeb294410 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/plugin.go @@ -65,7 +65,7 @@ func (pl *InterPodAffinity) EventsToRegister() []framework.ClusterEventWithHint // - Add. An unschedulable Pod may fail due to violating pod-affinity constraints, // adding an assigned Pod may make it schedulable. {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}}, - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, } } diff --git a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go index c96db37d02524..ce764219db126 100644 --- a/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go +++ b/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go @@ -148,7 +148,7 @@ func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange}, // Node add|delete|update maybe lead an topology key changed, // and make these pod in scheduling schedulable or unschedulable. - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange}, } } diff --git a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go index a02162d2a4554..354b805dd5c74 100644 --- a/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go +++ b/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go @@ -103,7 +103,7 @@ func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint { // Pods may fail to find available PVs because the node labels do not // match the storage class's allowed topologies or PV's node affinity. // A new or updated node may make pods schedulable. - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, // We rely on CSI node to translate in-tree PV to CSI. {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}}, // When CSIStorageCapacity is enabled, pods may become schedulable diff --git a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go index c1893aa63cc73..52d3d2ce9eecd 100644 --- a/pkg/scheduler/framework/plugins/volumezone/volume_zone.go +++ b/pkg/scheduler/framework/plugins/volumezone/volume_zone.go @@ -280,7 +280,7 @@ func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint { // Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored. {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}}, // A new node or updating a node's volume zone labels may make a pod schedulable. - {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}}, + {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}}, // A new pvc may make a pod schedulable. // Due to fields are immutable except `spec.resources`, pvc update events are ignored. {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}}, diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index cc839b3e3eef9..13b708b07700a 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -72,7 +72,16 @@ const ( // - a Pod that is deleted // - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle. // - an existing Pod that was unscheduled but gets scheduled to a Node. - Pod GVK = "Pod" + Pod GVK = "Pod" + // A note about NodeAdd event and UpdateNodeTaint event: + // NodeAdd QueueingHint isn't always called because of the internal feature called preCheck. + // It's definitely not something expected for plugin developers, + // and registering UpdateNodeTaint event is the only mitigation for now. + // So, kube-scheduler registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event. + // It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuck in the + // unschedulable pod pool. + // This behavior will be removed when we remove the preCheck feature. + // See: https://github.com/kubernetes/kubernetes/issues/110175 Node GVK = "Node" PersistentVolume GVK = "PersistentVolume" PersistentVolumeClaim GVK = "PersistentVolumeClaim" diff --git a/pkg/scheduler/internal/queue/scheduling_queue.go b/pkg/scheduler/internal/queue/scheduling_queue.go index c6a693a8f5905..115f81afc13db 100644 --- a/pkg/scheduler/internal/queue/scheduling_queue.go +++ b/pkg/scheduler/internal/queue/scheduling_queue.go @@ -118,6 +118,7 @@ type SchedulingQueue interface { AssignedPodAdded(logger klog.Logger, pod *v1.Pod) AssignedPodUpdated(logger klog.Logger, oldPod, newPod *v1.Pod) PendingPods() ([]*v1.Pod, string) + PodsInActiveQ() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() @@ -1227,6 +1228,18 @@ func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog return podsToMove } +// PodsInActiveQ returns all the Pods in the activeQ. +// This function is only used in tests. +func (p *PriorityQueue) PodsInActiveQ() []*v1.Pod { + p.lock.RLock() + defer p.lock.RUnlock() + var result []*v1.Pod + for _, pInfo := range p.activeQ.List() { + result = append(result, pInfo.(*framework.QueuedPodInfo).Pod) + } + return result +} + var pendingPodsSummary = "activeQ:%v; backoffQ:%v; unschedulablePods:%v" // PendingPods returns all the pending pods in the queue; accompanied by a debugging string diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index ae4be17bb6362..992dfbc4a1c3c 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -62,8 +62,8 @@ const ( numberOfHighestScoredNodesToReport = 3 ) -// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. -func (sched *Scheduler) scheduleOne(ctx context.Context) { +// ScheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. +func (sched *Scheduler) ScheduleOne(ctx context.Context) { logger := klog.FromContext(ctx) podInfo, err := sched.NextPod(logger) if err != nil { diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 4d9d1cc4be2de..8bd346901bb4c 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -811,7 +811,7 @@ func TestSchedulerScheduleOne(t *testing.T) { if err != nil { t.Fatal(err) } - sched.scheduleOne(ctx) + sched.ScheduleOne(ctx) <-called if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { t.Errorf("assumed pod: wanted %v, got %v", e, a) @@ -884,7 +884,7 @@ func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) { // We use conflicted pod ports to incur fit predicate failure if first pod not removed. secondPod := podWithPort("bar", "", 8080) queuedPodStore.Add(secondPod) - scheduler.scheduleOne(ctx) + scheduler.ScheduleOne(ctx) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -921,7 +921,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { // queuedPodStore: [bar:8080] // cache: [(assumed)foo:8080] - scheduler.scheduleOne(ctx) + scheduler.ScheduleOne(ctx) select { case err := <-errChan: expectErr := &framework.FitError{ @@ -954,7 +954,7 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) { } queuedPodStore.Add(secondPod) - scheduler.scheduleOne(ctx) + scheduler.ScheduleOne(ctx) select { case b := <-bindingChan: expectBinding := &v1.Binding{ @@ -1030,7 +1030,7 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) { scheduler, _, errChan := setupTestScheduler(ctx, t, queuedPodStore, scache, informerFactory, nil, fns...) queuedPodStore.Add(podWithTooBigResourceRequests) - scheduler.scheduleOne(ctx) + scheduler.ScheduleOne(ctx) select { case err := <-errChan: expectErr := &framework.FitError{ @@ -1160,7 +1160,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) { if err != nil { t.Fatal(err) } - s.scheduleOne(ctx) + s.ScheduleOne(ctx) // Wait for pod to succeed or fail scheduling select { case <-eventChan: @@ -3481,7 +3481,7 @@ func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queue // queuedPodStore: [foo:8080] // cache: [] - scheduler.scheduleOne(ctx) + scheduler.ScheduleOne(ctx) // queuedPodStore: [] // cache: [(assumed)foo:8080] diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8cf3b52f78f5b..5efc1fc0a7801 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -389,17 +389,47 @@ func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.Queuei // cannot be moved by any regular cluster event. // So, we can just ignore such EventsToRegister here. + registerNodeAdded := false + registerNodeTaintUpdated := false for _, event := range events { fn := event.QueueingHintFn if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { fn = defaultQueueingHintFn } + if event.Event.Resource == framework.Node { + if event.Event.ActionType&framework.Add != 0 { + registerNodeAdded = true + } + if event.Event.ActionType&framework.UpdateNodeTaint != 0 { + registerNodeTaintUpdated = true + } + } + queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{ PluginName: e.Name(), QueueingHintFn: fn, }) } + if registerNodeAdded && !registerNodeTaintUpdated { + // Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437 + // NodeAdded QueueingHint isn't always called because of preCheck. + // It's definitely not something expected for plugin developers, + // and registering UpdateNodeTaint event is the only mitigation for now. + // + // So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event. + // It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the + // unschedulable pod pool. + // This behavior will be removed when we remove the preCheck feature. + // See: https://github.com/kubernetes/kubernetes/issues/110175 + queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] = + append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}], + &internalqueue.QueueingHintFunction{ + PluginName: e.Name(), + QueueingHintFn: defaultQueueingHintFn, + }, + ) + } } return queueingHintMap } @@ -415,7 +445,7 @@ func (sched *Scheduler) Run(ctx context.Context) { // If there are no new pods to schedule, it will be hanging there // and if done in this goroutine it will be blocking closing // SchedulingQueue, in effect causing a deadlock on shutdown. - go wait.UntilWithContext(ctx, sched.scheduleOne, 0) + go wait.UntilWithContext(ctx, sched.ScheduleOne, 0) <-ctx.Done() sched.SchedulingQueue.Close() diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 01aa43fc63287..c98f07707ca10 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -655,6 +655,9 @@ func Test_buildQueueingHintMap(t *testing.T) { {Resource: framework.Node, ActionType: framework.Add}: { {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn}, }, + {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: { + {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered. + }, }, }, { @@ -668,6 +671,9 @@ func Test_buildQueueingHintMap(t *testing.T) { {Resource: framework.Node, ActionType: framework.Add}: { {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // default queueing hint due to disabled feature gate. }, + {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: { + {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered. + }, }, }, { @@ -685,6 +691,9 @@ func Test_buildQueueingHintMap(t *testing.T) { {Resource: framework.Node, ActionType: framework.Add}: { {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn}, }, + {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: { + {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn}, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered. + }, }, }, } @@ -791,7 +800,7 @@ func Test_UnionedGVKs(t *testing.T) { Disabled: []schedulerapi.Plugin{{Name: "*"}}, // disable default plugins }, want: map[framework.GVK]framework.ActionType{ - framework.Node: framework.Add, + framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered. }, }, { @@ -821,7 +830,7 @@ func Test_UnionedGVKs(t *testing.T) { }, want: map[framework.GVK]framework.ActionType{ framework.Pod: framework.Add, - framework.Node: framework.Add, + framework.Node: framework.Add | framework.UpdateNodeTaint, // When Node/Add is registered, Node/UpdateNodeTaint is automatically registered. }, }, { diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go index 42535552916a9..1658b3d1818b0 100644 --- a/test/integration/scheduler/queue_test.go +++ b/test/integration/scheduler/queue_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -178,88 +179,154 @@ func TestSchedulingGates(t *testing.T) { // TestCoreResourceEnqueue verify Pods failed by in-tree default plugins can be // moved properly upon their registered events. func TestCoreResourceEnqueue(t *testing.T) { - // Use zero backoff seconds to bypass backoffQ. - // It's intended to not start the scheduler's queue, and hence to - // not start any flushing logic. We will pop and schedule the Pods manually later. - testCtx := testutils.InitTestSchedulerWithOptions( - t, - testutils.InitTestAPIServer(t, "core-res-enqueue", nil), - 0, - scheduler.WithPodInitialBackoffSeconds(0), - scheduler.WithPodMaxBackoffSeconds(0), - ) - testutils.SyncSchedulerInformerFactory(testCtx) - - defer testCtx.Scheduler.SchedulingQueue.Close() + tests := []struct { + name string + // initialNode is the Node to be created at first. + initialNode *v1.Node + // initialPod is the Pod to be created at first if it's not empty. + initialPod *v1.Pod + // pods are the list of Pods to be created. + // All of them are expected to be unschedulable at first. + pods []*v1.Pod + // triggerFn is the function that triggers the event to move Pods. + triggerFn func(testCtx *testutils.TestContext) error + // wantRequeuedPods is the map of Pods that are expected to be requeued after triggerFn. + wantRequeuedPods sets.Set[string] + }{ + { + name: "Pod without a required toleration to a node isn't requeued to activeQ", + initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), + pods: []*v1.Pod{ + // - Pod1 doesn't have the required toleration and will be rejected by the TaintToleration plugin. + // (TaintToleration plugin is evaluated before NodeResourcesFit plugin.) + // - Pod2 has the required toleration, but requests a large amount of CPU - will be rejected by the NodeResourcesFit plugin. + st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(), + st.MakePod().Name("pod2").Toleration(v1.TaintNodeNotReady).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger a NodeChange event by increasing CPU capacity. + // It makes Pod2 schedulable. + // Pod1 is not requeued because the Node is still unready and it doesn't have the required toleration. + if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to update the node: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + }, + { + name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready", + initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(), + initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(), + pods: []*v1.Pod{ + // - Pod2 will be rejected by the PodAffinity plugin. + st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(), + }, + triggerFn: func(testCtx *testutils.TestContext) error { + // Trigger a NodeCreated event. + // Note that this Node has a un-ready taint and pod2 should be requeued ideally because unschedulable plugins registered for pod2 is PodAffinity. + // However, due to preCheck, it's not requeueing pod2 to activeQ. + // It'll be fixed by the removal of preCheck in the future. + // https://github.com/kubernetes/kubernetes/issues/110175 + node := st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj() + if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "foo", Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create a newnode: %w", err) + } - cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx - // Create one Node with a taint. - node := st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj() - node.Spec.Taints = []v1.Taint{{Key: "foo", Effect: v1.TaintEffectNoSchedule}} - if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { - t.Fatalf("Failed to create Node %q: %v", node.Name, err) + // As a mitigation of an issue described above, all plugins subscribing Node/Add event register UpdateNodeTaint too. + // So, this removal of taint moves pod2 to activeQ. + node.Spec.Taints = nil + if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to remove taints off the node: %w", err) + } + return nil + }, + wantRequeuedPods: sets.New("pod2"), + }, } - // Create two Pods that are both unschedulable. - // - Pod1 is a best-effort Pod, but doesn't have the required toleration. - // - Pod2 requests a large amount of CPU resource that the node cannot fit. - // Note: Pod2 will fail the tainttoleration plugin b/c that's ordered prior to noderesources. - // - Pod3 has the required toleration, but requests a non-existing PVC. - pod1 := st.MakePod().Namespace(ns).Name("pod1").Container("image").Obj() - pod2 := st.MakePod().Namespace(ns).Name("pod2").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj() - pod3 := st.MakePod().Namespace(ns).Name("pod3").Toleration("foo").PVC("pvc").Container("image").Obj() - for _, pod := range []*v1.Pod{pod1, pod2, pod3} { - if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { - t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) - } - } + for _, featureEnabled := range []bool{false, true} { + for _, tt := range tests { + t.Run(fmt.Sprintf("%s [SchedulerQueueingHints enabled: %v]", tt.name, featureEnabled), func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, featureEnabled)() + + // Use zero backoff seconds to bypass backoffQ. + // It's intended to not start the scheduler's queue, and hence to + // not start any flushing logic. We will pop and schedule the Pods manually later. + testCtx := testutils.InitTestSchedulerWithOptions( + t, + testutils.InitTestAPIServer(t, "core-res-enqueue", nil), + 0, + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + ) + testutils.SyncSchedulerInformerFactory(testCtx) + + defer testCtx.Scheduler.SchedulingQueue.Close() + + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + // Create one Node with a taint. + if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err) + } - // Wait for the three pods to be present in the scheduling queue. - if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() - return len(pendingPods) == 3, nil - }); err != nil { - t.Fatal(err) - } + if tt.initialPod != nil { + if _, err := cs.CoreV1().Pods(ns).Create(ctx, tt.initialPod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create an initial Pod %q: %v", tt.initialPod.Name, err) + } + } - // Pop the three pods out. They should be unschedulable. - for i := 0; i < 3; i++ { - podInfo := testutils.NextPodOrDie(t, testCtx) - fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName] - if !ok { - t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) - } - // Schedule the Pod manually. - _, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod) - if fitError == nil { - t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) - } - testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now()) - } + for _, pod := range tt.pods { + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + } - // Trigger a NodeTaintChange event. - // We expect this event to trigger moving the test Pod from unschedulablePods to activeQ. - node.Spec.Taints = nil - if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil { - t.Fatalf("Failed to remove taints off the node: %v", err) - } + // Wait for the tt.pods to be present in the scheduling queue. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.pods), nil + }); err != nil { + t.Fatal(err) + } - // Now we should be able to pop the Pod from activeQ again. - podInfo := testutils.NextPodOrDie(t, testCtx) - if podInfo.Attempts != 2 { - t.Fatalf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts) - } - if got := podInfo.Pod.Name; got != "pod1" { - t.Fatalf("Expected pod1 to be popped, but got %v", got) - } + t.Log("Confirmed Pods in the scheduling queue, starting to schedule them") + + // Pop all pods out. They should be unschedulable. + for i := 0; i < len(tt.pods); i++ { + testCtx.Scheduler.ScheduleOne(testCtx.Ctx) + } + // Wait for the tt.pods to be still present in the scheduling queue. + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods() + return len(pendingPods) == len(tt.pods), nil + }); err != nil { + t.Fatal(err) + } - // Pod2 and Pod3 are not expected to be popped out. - // - Although the failure reason has been lifted, Pod2 still won't be moved to active due to - // the node event's preCheckForNode(). - // - Regarding Pod3, the NodeTaintChange event is irrelevant with its scheduling failure. - podInfo = testutils.NextPod(t, testCtx) - if podInfo != nil { - t.Fatalf("Unexpected pod %v get popped out", podInfo.Pod.Name) + t.Log("finished initial schedulings for all Pods, will trigger triggerFn") + + err := tt.triggerFn(testCtx) + if err != nil { + t.Fatalf("Failed to trigger the event: %v", err) + } + + t.Log("triggered tt.triggerFn, will check if tt.requeuedPods are requeued") + + // Wait for the tt.pods to be still present in the scheduling queue. + var requeuedPods sets.Set[string] + if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + requeuedPods = sets.Set[string]{} // reset + for _, requeuedPod := range testCtx.Scheduler.SchedulingQueue.PodsInActiveQ() { + requeuedPods.Insert(requeuedPod.Name) + } + + return requeuedPods.Equal(tt.wantRequeuedPods), nil + }); err != nil { + t.Fatalf("Expect Pods %v to be requeued, but %v are requeued actually", tt.wantRequeuedPods, requeuedPods) + } + }) + } } } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 14f65b2ff9dc3..a945e11cb39d2 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -1159,6 +1159,8 @@ func NextPodOrDie(t *testing.T, testCtx *TestContext) *schedulerframework.Queued } // NextPod returns the next Pod in the scheduler queue, with a 5 seconds timeout. +// Note that this function leaks goroutines in the case of timeout; even after this function returns after timeout, +// the goroutine made by this function keep waiting to pop a pod from the queue. func NextPod(t *testing.T, testCtx *TestContext) *schedulerframework.QueuedPodInfo { t.Helper()