Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary preemptions when candidates have the same timestamp #1875

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Expand Up @@ -389,6 +389,7 @@ func workloadFits(wlReq cache.FlavorResourceQuantities, cq *cache.ClusterQueue,
}

// candidatesOrdering criteria:
// 0. Workloads already marked for preemption first.
// 1. Workloads from other ClusterQueues in the cohort before the ones in the
// same ClusterQueue as the preemptor.
// 2. Workloads with lower priority first.
Expand All @@ -397,6 +398,11 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f
return func(i, j int) bool {
a := candidates[i]
b := candidates[j]
aEvicted := meta.IsStatusConditionTrue(a.Obj.Status.Conditions, kueue.WorkloadEvicted)
bEvicted := meta.IsStatusConditionTrue(b.Obj.Status.Conditions, kueue.WorkloadEvicted)
if aEvicted != bEvicted {
return aEvicted
}
aInCQ := a.ClusterQueue == cq
bInCQ := b.ClusterQueue == cq
if aInCQ != bInCQ {
Expand All @@ -407,7 +413,13 @@ func candidatesOrdering(candidates []*workload.Info, cq string, now time.Time) f
if pa != pb {
return pa < pb
}
return quotaReservationTime(b.Obj, now).Before(quotaReservationTime(a.Obj, now))
timeA := quotaReservationTime(a.Obj, now)
timeB := quotaReservationTime(b.Obj, now)
if !timeA.Equal(timeB) {
return timeA.After(timeB)
}
// Arbitrary comparison for deterministic sorting.
return a.Obj.UID < b.Obj.UID
}
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/scheduler/preemption/preemption_test.go
Expand Up @@ -1127,15 +1127,25 @@ func TestCandidatesOrdering(t *testing.T) {
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("low", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
Priority(10).
Priority(-10).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("other", "").
ReserveQuota(utiltesting.MakeAdmission("other").Obj()).
Priority(10).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
workload.NewInfo(utiltesting.MakeWorkload("evicted", "").
SetOrReplaceCondition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
}).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old-a", "").
UID("old-a").
ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("old-b", "").
UID("old-b").
ReserveQuotaAt(utiltesting.MakeAdmission("self").Obj(), now).
Obj()),
workload.NewInfo(utiltesting.MakeWorkload("current", "").
ReserveQuota(utiltesting.MakeAdmission("self").Obj()).
Expand All @@ -1151,7 +1161,7 @@ func TestCandidatesOrdering(t *testing.T) {
for i, c := range candidates {
gotNames[i] = workload.Key(c.Obj)
}
wantCandidates := []string{"/other", "/low", "/current", "/old", "/high"}
wantCandidates := []string{"/evicted", "/other", "/low", "/current", "/old-a", "/old-b", "/high"}
if diff := cmp.Diff(wantCandidates, gotNames); diff != "" {
t.Errorf("Sorted with wrong order (-want,+got):\n%s", diff)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/util/testing/wrappers.go
Expand Up @@ -82,6 +82,11 @@ func (w *WorkloadWrapper) Clone() *WorkloadWrapper {
return &WorkloadWrapper{Workload: *w.DeepCopy()}
}

func (w *WorkloadWrapper) UID(uid types.UID) *WorkloadWrapper {
w.Workload.UID = uid
return w
}

func (w *WorkloadWrapper) Finalizers(fin ...string) *WorkloadWrapper {
w.ObjectMeta.Finalizers = fin
return w
Expand Down Expand Up @@ -116,11 +121,16 @@ func (w *WorkloadWrapper) Active(a bool) *WorkloadWrapper {

// ReserveQuota sets workload admission and adds a "QuotaReserved" status condition
func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper {
return w.ReserveQuotaAt(a, time.Now())
}

// ReserveQuotaAt sets workload admission and adds a "QuotaReserved" status condition
func (w *WorkloadWrapper) ReserveQuotaAt(a *kueue.Admission, now time.Time) *WorkloadWrapper {
w.Status.Admission = a
w.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
LastTransitionTime: metav1.NewTime(now),
Reason: "AdmittedByTest",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", w.Status.Admission.ClusterQueue),
}}
Expand Down
52 changes: 50 additions & 2 deletions test/integration/scheduler/preemption_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -306,7 +307,7 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaLowWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling", func() {
ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with different priorities", func() {
ginkgo.By("Creating workloads in beta-cq that borrow quota")

betaMidWl := testing.MakeWorkload("beta-mid", ns.Name).
Expand Down Expand Up @@ -345,7 +346,7 @@ var _ = ginkgo.Describe("Preemption", func() {
util.FinishEvictionForWorkloads(ctx, k8sClient, betaMidWl)

// one of alpha-mid and gamma-mid should be admitted
gomega.Eventually(func() []*kueue.Workload { return util.FilterAdmittedWorkloads(ctx, k8sClient, alphaMidWl, gammaMidWl) }, util.Interval*4, util.Interval).Should(gomega.HaveLen(1))
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaMidWl, gammaMidWl)

// betaHighWl remains admitted
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, betaCQ.Name, betaHighWl)
Expand All @@ -356,6 +357,53 @@ var _ = ginkgo.Describe("Preemption", func() {
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, alphaCQ.Name, alphaMidWl)
util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, gammaCQ.Name, gammaMidWl)
})

ginkgo.It("Should preempt all necessary workloads in concurrent scheduling with the same priority", func() {
var betaWls []*kueue.Workload
for i := 0; i < 3; i++ {
wl := testing.MakeWorkload(fmt.Sprintf("beta-%d", i), ns.Name).
Queue(betaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())
betaWls = append(betaWls, wl)
}
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, betaWls...)

ginkgo.By("Creating preempting pods")

alphaWl := testing.MakeWorkload("alpha", ns.Name).
Queue(alphaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaWl)).To(gomega.Succeed())

gammaWl := testing.MakeWorkload("gamma", ns.Name).
Queue(gammaQ.Name).
Request(corev1.ResourceCPU, "2").
Obj()
gomega.Expect(k8sClient.Create(ctx, gammaWl)).To(gomega.Succeed())

var evictedWorkloads []*kueue.Workload
gomega.Eventually(func() int {
evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...)
return len(evictedWorkloads)
}, util.Timeout, util.Interval).Should(gomega.Equal(1), "Number of evicted workloads")

ginkgo.By("Finishing eviction for first set of preempted workloads")
util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...)
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, alphaWl, gammaWl)

gomega.Eventually(func() int {
evictedWorkloads = util.FilterEvictedWorkloads(ctx, k8sClient, betaWls...)
return len(evictedWorkloads)
}, util.Timeout, util.Interval).Should(gomega.Equal(2), "Number of evicted workloads")

ginkgo.By("Finishing eviction for second set of preempted workloads")
util.FinishEvictionForWorkloads(ctx, k8sClient, evictedWorkloads...)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, alphaWl, gammaWl)
util.ExpectWorkloadsToBeAdmittedCount(ctx, k8sClient, 1, betaWls...)
})
})

ginkgo.Context("In a cohort with StrictFIFO", func() {
Expand Down
26 changes: 22 additions & 4 deletions test/util/util.go
Expand Up @@ -243,11 +243,21 @@ func ExpectWorkloadsToHaveQuotaReservation(ctx context.Context, k8sClient client
}

func FilterAdmittedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
return filterWorkloads(ctx, k8sClient, workload.HasQuotaReservation, wls...)
}

func FilterEvictedWorkloads(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) []*kueue.Workload {
return filterWorkloads(ctx, k8sClient, func(wl *kueue.Workload) bool {
return apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted)
}, wls...)
}

func filterWorkloads(ctx context.Context, k8sClient client.Client, filter func(*kueue.Workload) bool, wls ...*kueue.Workload) []*kueue.Workload {
ret := make([]*kueue.Workload, 0, len(wls))
var updatedWorkload kueue.Workload
for _, wl := range wls {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)
if err == nil && workload.HasQuotaReservation(&updatedWorkload) {
if err == nil && filter(&updatedWorkload) {
ret = append(ret, wl)
}
}
Expand All @@ -273,17 +283,25 @@ func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wl
}

func ExpectWorkloadsToBeAdmitted(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(1, func() int {
expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, len(wls), wls...)
}

func ExpectWorkloadsToBeAdmittedCount(ctx context.Context, k8sClient client.Client, count int, wls ...*kueue.Workload) {
expectWorkloadsToBeAdmittedCountWithOffset(ctx, 2, k8sClient, count, wls...)
}

func expectWorkloadsToBeAdmittedCountWithOffset(ctx context.Context, offset int, k8sClient client.Client, count int, wls ...*kueue.Workload) {
gomega.EventuallyWithOffset(offset, func() int {
admitted := 0
var updatedWorkload kueue.Workload
for _, wl := range wls {
gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
gomega.ExpectWithOffset(offset, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed())
if apimeta.IsStatusConditionTrue(updatedWorkload.Status.Conditions, kueue.WorkloadAdmitted) {
admitted++
}
}
return admitted
}, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are admitted")
}, Timeout, Interval).Should(gomega.Equal(count), "Not enough workloads are admitted")
}

func ExpectWorkloadToFinish(ctx context.Context, k8sClient client.Client, wlKey client.ObjectKey) {
Expand Down