Skip to content

Commit

Permalink
Fix preemption blocked by earlier pending Workload (#1866)
Browse files Browse the repository at this point in the history
Change-Id: I69584dd95c57539e163067a4fb93cdb32fc57461
  • Loading branch information
alculquicondor committed Mar 19, 2024
1 parent 0d82048 commit ba46285
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 6 deletions.
20 changes: 15 additions & 5 deletions pkg/scheduler/scheduler.go
Expand Up @@ -202,19 +202,23 @@ func (s *Scheduler) schedule(ctx context.Context) {
// head got admitted that should be scheduled in the cohort before the heads
// of other clusterQueues.
cycleCohortsUsage := cohortsUsage{}
cycleCohortsSkipPreemption := sets.New[string]()
for i := range entries {
e := &entries[i]
if e.assignment.RepresentativeMode() == flavorassigner.NoFit {
mode := e.assignment.RepresentativeMode()
if mode == flavorassigner.NoFit {
continue
}

cq := snapshot.ClusterQueues[e.ClusterQueue]
if cq.Cohort != nil {
sum := cycleCohortsUsage.totalUsageForCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage)
// If the workload uses resources that were potentially assumed in this cycle and will no longer fit in the
// cohort. If a resource of a flavor is used only once or for the first time in the cycle the checks done by
// the flavorassigner are still valid.
if cycleCohortsUsage.hasCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage) && !cq.FitInCohort(sum) {
// Check whether there was an assignment in this cycle that could render the next assignments invalid:
// - If the workload no longer fits in the cohort.
// - If there was another assignment in the cohort, then the preemption calculation is no longer valid.
if cycleCohortsUsage.hasCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage) &&
((mode == flavorassigner.Fit && !cq.FitInCohort(sum)) ||
(mode == flavorassigner.Preempt && cycleCohortsSkipPreemption.Has(cq.Cohort.Name))) {
e.status = skipped
e.inadmissibleMsg = "other workloads in the cohort were prioritized"
// When the workload needs borrowing and there is another workload in cohort doesn't
Expand All @@ -241,6 +245,9 @@ func (s *Scheduler) schedule(ctx context.Context) {
e.inadmissibleMsg += fmt.Sprintf(". Pending the preemption of %d workload(s)", preempted)
e.requeueReason = queue.RequeueReasonPendingPreemption
}
if cq.Cohort != nil {
cycleCohortsSkipPreemption.Insert(cq.Cohort.Name)
}
} else {
log.V(2).Info("Workload requires preemption, but there are no candidate workloads allowed for preemption", "preemption", cq.Preemption)
}
Expand All @@ -262,6 +269,9 @@ func (s *Scheduler) schedule(ctx context.Context) {
if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
if cq.Cohort != nil {
cycleCohortsSkipPreemption.Insert(cq.Cohort.Name)
}
}

// 6. Requeue the heads that were not scheduled.
Expand Down
54 changes: 54 additions & 0 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -716,6 +716,60 @@ func TestSchedule(t *testing.T) {
"eng-alpha/borrower": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "60").Obj(),
},
},
"multiple CQs need preemption": {
additionalClusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("other-alpha").
Cohort("other").
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "50").Obj(),
).
Obj(),
*utiltesting.MakeClusterQueue("other-beta").
Cohort("other").
Preemption(kueue.ClusterQueuePreemption{
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
ResourceGroup(
*utiltesting.MakeFlavorQuotas("on-demand").
Resource(corev1.ResourceCPU, "50", "10").Obj(),
).
Obj(),
},
additionalLocalQueues: []kueue.LocalQueue{
*utiltesting.MakeLocalQueue("other", "eng-alpha").ClusterQueue("other-alpha").Obj(),
*utiltesting.MakeLocalQueue("other", "eng-beta").ClusterQueue("other-beta").Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("preemptor", "eng-beta").
Priority(-1).
Queue("other").
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakeWorkload("pending", "eng-alpha").
Priority(1).
Queue("other").
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltesting.MakeWorkload("use-all", "eng-alpha").
Request(corev1.ResourceCPU, "100").
ReserveQuota(utiltesting.MakeAdmission("other-alpha").Assignment(corev1.ResourceCPU, "on-demand", "100").Obj()).
Obj(),
},
wantLeft: map[string][]string{
// Preemptor is not admitted in this cycle.
"other-beta": {"eng-beta/preemptor"},
},
wantInadmissibleLeft: map[string][]string{
"other-alpha": {"eng-alpha/pending"},
},
wantPreempted: sets.New("eng-alpha/use-all"),
wantAssignments: map[string]kueue.Admission{
// Removal from cache for the preempted workloads is deferred until we receive Workload updates
"eng-alpha/use-all": *utiltesting.MakeAdmission("other-alpha").Assignment(corev1.ResourceCPU, "on-demand", "100").Obj(),
},
},
"cannot borrow resource not listed in clusterQueue": {
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("new", "eng-alpha").
Expand Down
80 changes: 79 additions & 1 deletion test/integration/scheduler/preemption_test.go
Expand Up @@ -358,6 +358,84 @@ var _ = ginkgo.Describe("Preemption", func() {
})
})

ginkgo.Context("In a cohort with StrictFIFO", func() {
var (
alphaCQ, betaCQ *kueue.ClusterQueue
alphaLQ, betaLQ *kueue.LocalQueue
oneFlavor *kueue.ResourceFlavor
)

ginkgo.BeforeEach(func() {
oneFlavor = testing.MakeResourceFlavor("one").Obj()
gomega.Expect(k8sClient.Create(ctx, oneFlavor)).To(gomega.Succeed())

alphaCQ = testing.MakeClusterQueue("alpha-cq").
Cohort("all").
QueueingStrategy(kueue.StrictFIFO).
ResourceGroup(*testing.MakeFlavorQuotas("one").Resource(corev1.ResourceCPU, "2").Obj()).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, alphaCQ)).To(gomega.Succeed())
alphaLQ = testing.MakeLocalQueue("alpha-lq", ns.Name).ClusterQueue("alpha-cq").Obj()
gomega.Expect(k8sClient.Create(ctx, alphaLQ)).To(gomega.Succeed())
betaCQ = testing.MakeClusterQueue("beta-cq").
Cohort("all").
QueueingStrategy(kueue.StrictFIFO).
ResourceGroup(*testing.MakeFlavorQuotas("one").Resource(corev1.ResourceCPU, "2").Obj()).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
ReclaimWithinCohort: kueue.PreemptionPolicyAny,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, betaCQ)).To(gomega.Succeed())
betaLQ = testing.MakeLocalQueue("beta-lq", ns.Name).ClusterQueue("beta-cq").Obj()
gomega.Expect(k8sClient.Create(ctx, betaLQ)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteWorkloadsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, alphaLQ)).To(gomega.Succeed())
gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, alphaCQ)).To(gomega.Succeed())
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, betaLQ)).To(gomega.Succeed())
gomega.Expect(util.DeleteClusterQueue(ctx, k8sClient, betaCQ)).To(gomega.Succeed())
})

ginkgo.It("Should reclaim from cohort even if another CQ has pending workloads", func() {
useAllAlphaWl := testing.MakeWorkload("use-all", ns.Name).
Queue("alpha-lq").
Priority(1).
Request(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, useAllAlphaWl)).To(gomega.Succeed())
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, useAllAlphaWl)

pendingAlphaWl := testing.MakeWorkload("pending", ns.Name).
Queue("alpha-lq").
Priority(0).
Request(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, pendingAlphaWl)).To(gomega.Succeed())
util.ExpectWorkloadsToBePending(ctx, k8sClient, pendingAlphaWl)

ginkgo.By("Creating a workload to reclaim quota")

preemptorBetaWl := testing.MakeWorkload("preemptor", ns.Name).
Queue("beta-lq").
Priority(-1).
Request(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, preemptorBetaWl)).To(gomega.Succeed())
util.ExpectWorkloadsToBePreempted(ctx, k8sClient, useAllAlphaWl)
util.FinishEvictionForWorkloads(ctx, k8sClient, useAllAlphaWl)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, preemptorBetaWl)
util.ExpectWorkloadsToBePending(ctx, k8sClient, useAllAlphaWl, pendingAlphaWl)
})

})

ginkgo.Context("When most quota is in a shared ClusterQueue in a cohort", func() {
var (
aStandardCQ, aBestEffortCQ, bStandardCQ, bBestEffortCQ, sharedCQ *kueue.ClusterQueue
Expand Down Expand Up @@ -550,7 +628,7 @@ var _ = ginkgo.Describe("Preemption", func() {
})
})

ginkgo.Context("Should be able to preempt when lending limit enabled", func() {
ginkgo.Context("When lending limit enabled", func() {
var (
prodCQ *kueue.ClusterQueue
devCQ *kueue.ClusterQueue
Expand Down

0 comments on commit ba46285

Please sign in to comment.