From c752e89e98407839d7c098bdf3c35df7176ab76e Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 20 Mar 2024 20:49:46 +0000 Subject: [PATCH] Avoid unnecessary preemptions when candidates have the same timestamp Change-Id: I68155a467880059c65deb75b00939b14551924cd --- pkg/scheduler/preemption/preemption.go | 14 ++++- pkg/scheduler/preemption/preemption_test.go | 18 +++++-- pkg/util/testing/wrappers.go | 12 ++++- test/integration/scheduler/preemption_test.go | 52 ++++++++++++++++++- test/util/util.go | 26 ++++++++-- 5 files changed, 110 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index 71dbdccfb1..36cb2555d4 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -389,6 +389,7 @@ func workloadFits(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue, } // candidatesOrdering criteria: +// 0. Workloads already marked for preemption first. // 1. Workloads from other ClusterQueues in the cohort before the ones in the // same ClusterQueue as the preemptor. // 2. Workloads with lower priority first. @@ -397,6 +398,11 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f return func(i, j int) bool { a := candidates[i] b := candidates[j] + aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted) + bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted) + if aEvicted != bEvicted { + return aEvicted + } aInCQ := a.ClusterQueue == cq bInCQ := b.ClusterQueue == cq if aInCQ != bInCQ { @@ -407,7 +413,13 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f if pa != pb { return pa < pb } - return quotaReservationTime(b.Obj, now).Before(quotaReservationTime(a.Obj, now)) + timeA := quotaReservationTime(a.Obj, now) + timeB := quotaReservationTime(b.Obj, now) + if !timeA.Equal(timeB) { + return timeA.After(timeB) + } + // Arbitrary comparison for deterministic sorting. + return a.Obj.UID < b.Obj.UID } } diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 36f948ec79..bdd8ae3fd1 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -1127,15 +1127,25 @@ func TestCandidatesOrdering(t *testing.T) { Obj()), workload.NewInfo(utiltesting.MakeWorkload("low", ""). ReserveQuota(utiltesting.MakeAdmission("self").Obj()). - Priority(10). Priority(-10). Obj()), workload.NewInfo(utiltesting.MakeWorkload("other", ""). ReserveQuota(utiltesting.MakeAdmission("other").Obj()). Priority(10). Obj()), - workload.NewInfo(utiltesting.MakeWorkload("old", ""). - ReserveQuota(utiltesting.MakeAdmission("self").Obj()). + workload.NewInfo(utiltesting.MakeWorkload("evicted", ""). + SetOrReplaceCondition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + }). + Obj()), + workload.NewInfo(utiltesting.MakeWorkload("old-a", ""). + UID("old-a"). + ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now). + Obj()), + workload.NewInfo(utiltesting.MakeWorkload("old-b", ""). + UID("old-b"). + ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now). Obj()), workload.NewInfo(utiltesting.MakeWorkload("current", ""). ReserveQuota(utiltesting.MakeAdmission("self").Obj()). @@ -1151,7 +1161,7 @@ func TestCandidatesOrdering(t *testing.T) { for i, c := range candidates { gotNames[i] = workload.Key(c.Obj) } - wantCandidates := []string{"/other", "/low", "/current", "/old", "/high"} + wantCandidates := []string{"/evicted", "/other", "/low", "/current", "/old-a", "/old-b", "/high"} if diff := cmp.Diff(wantCandidates, gotNames); diff != "" { t.Errorf("Sorted with wrong order (-want,+got):\n%s", diff) } diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index cf294e27fc..e74e18622e 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -82,6 +82,11 @@ func (w *WorkloadWrapper) Clone() *WorkloadWrapper { return &WorkloadWrapper{Workload: *w.DeepCopy()} } +func (w *WorkloadWrapper) UID(uid types.UID) *WorkloadWrapper { + w.Workload.UID = uid + return w +} + func (w *WorkloadWrapper) Finalizers(fin ...string) *WorkloadWrapper { w.ObjectMeta.Finalizers = fin return w @@ -116,11 +121,16 @@ func (w *WorkloadWrapper) Active(a bool) *WorkloadWrapper { // ReserveQuota sets workload admission and adds a "QuotaReserved" status condition func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper { + return w.ReserveQuotaAt(a, time.Now()) +} + +// ReserveQuotaAt sets workload admission and adds a "QuotaReserved" status condition +func (w *WorkloadWrapper) ReserveQuotaAt(a *kueue.Admission, now time.Time) *WorkloadWrapper { w.Status.Admission = a w.Status.Conditions = []metav1.Condition{{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionTrue, - LastTransitionTime: metav1.Now(), + LastTransitionTime: metav1.NewTime(now), Reason: "AdmittedByTest", Message: fmt.Sprintf("Admitted by ClusterQueue %s", w.Status.Admission.ClusterQueue), }} diff --git a/test/integration/scheduler/preemption_test.go b/test/integration/scheduler/preemption_test.go index 5caf8b9245..3d92ef0188 100644 --- a/test/integration/scheduler/preemption_test.go +++ b/test/integration/scheduler/preemption_test.go @@ -17,6 +17,7 @@ limitations under the License. package scheduler import ( + "fmt" "time" "github.com/onsi/ginkgo/v2" @@ -306,7 +307,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaLowWl) }) - ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() { + ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with different priorities", func() { ginkgo.By("Creating workloads in beta-cq that borrow quota") betaMidWl := testing.MakeWorkload("beta-mid", ns.Name). @@ -345,7 +346,7 @@ var _ = ginkgo.Describe("Preemption", func() { util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl) // one of alpha-mid and gamma-mid should be admitted - gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1)) + util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaMidWl, gammaMidWl) // betaHighWl remains admitted util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaHighWl) @@ -356,6 +357,53 @@ var _ = ginkgo.Describe("Preemption", func() { util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, alphaCQ.Name, alphaMidWl) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, gammaCQ.Name, gammaMidWl) }) + + ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with the same priority", func() { + var betaWls []*kueue.Workload + for i := 0; i < 3; i++ { + wl := testing.MakeWorkload(fmt.Sprintf("beta-%d", i), ns.Name). + Queue(betaQ.Name). + Request(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + betaWls = append(betaWls, wl) + } + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaWls...) + + ginkgo.By("Creating preempting pods") + + alphaWl := testing.MakeWorkload("alpha", ns.Name). + Queue(alphaQ.Name). + Request(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, alphaWl)).To(gomega.Succeed()) + + gammaWl := testing.MakeWorkload("gamma", ns.Name). + Queue(gammaQ.Name). + Request(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, gammaWl)).To(gomega.Succeed()) + + var evictedWorkloads []*kueue.Workload + gomega.Eventually(func() int { + evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...) + return len(evictedWorkloads) + }, util.Timeout, util.Interval).Should(gomega.Equal(1), "Number of evicted workloads") + + ginkgo.By("Finishing eviction for first set of preempted workloads") + util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...) + util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaWl, gammaWl) + + gomega.Eventually(func() int { + evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...) + return len(evictedWorkloads) + }, util.Timeout, util.Interval).Should(gomega.Equal(2), "Number of evicted workloads") + + ginkgo.By("Finishing eviction for second set of preempted workloads") + util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaWl, gammaWl) + util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, betaWls...) + }) }) ginkgo.Context("In a cohort with StrictFIFO", func() { diff --git a/test/util/util.go b/test/util/util.go index b340a63e47..41ab8fcc43 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -243,11 +243,21 @@ func ExpectWorkloadsToHaveQuotaReservation(ctx context.Context, k8sClient client } func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload { + return filterWorkloads(ctx, k8sClient, workload.HasQuotaReservation, wls...) +} + +func FilterEvictedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload { + return filterWorkloads(ctx, k8sClient, func(wl *kueue.Workload) bool { + return apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) + }, wls...) +} + +func filterWorkloads(ctx context.Context, k8sClient client.Client, filter func(*kueue.Workload) bool, wls ...*kueue.Workload) []*kueue.Workload { ret := make([]*kueue.Workload, 0, len(wls)) var updatedWorkload kueue.Workload for _, wl := range wls { err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload) - if err == nil && workload.HasQuotaReservation(&updatedWorkload) { + if err == nil && filter(&updatedWorkload) { ret = append(ret, wl) } } @@ -273,17 +283,25 @@ func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wl } func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) { - gomega.EventuallyWithOffset(1, func() int { + expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, len(wls), wls...) +} + +func ExpectWorkloadsToBeAdmittedCount(ctx context.Context, k8sClient client.Client, count int, wls ...*kueue.Workload) { + expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, count, wls...) +} + +func expectWorkloadsToBeAdmittedCountWithOffset(ctx context.Context, offset int, k8sClient client.Client, count int, wls ...*kueue.Workload) { + gomega.EventuallyWithOffset(offset, func() int { admitted := 0 var updatedWorkload kueue.Workload for _, wl := range wls { - gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed()) + gomega.ExpectWithOffset(offset, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed()) if apimeta.IsStatusConditionTrue(updatedWorkload.Status.Conditions, kueue.WorkloadAdmitted) { admitted++ } } return admitted - }, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are admitted") + }, Timeout, Interval).Should(gomega.Equal(count), "Not enough workloads are admitted") } func ExpectWorkloadToFinish(ctx context.Context, k8sClient client.Client, wlKey client.ObjectKey) {