Skip to content

Commit

Permalink
scale down
Browse files Browse the repository at this point in the history
n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

debuggin

n

m

patch error field not declared in schema

clean code

n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

a

cluster queue reconciliation fixed, it had to do with the infot totalrequests from admission
inside the worklad go file

working with scheduler

cleaning code

cleaning code

cleaning

cleaning

cleaning

integation test, but it messes up with parallelism test which should be expected

updated parallelism it test

updated wrappers

kep

removed Kep

removed log lines

clean code

added a better conditional for updating the resize if the job is a RayCluster

added Kind condition

updated test and equivalentToWorkload condition

added podset assigments check
  • Loading branch information
vicentefb committed Mar 15, 2024
1 parent 463be60 commit daa8d26
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 18 deletions.
21 changes: 21 additions & 0 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -48,6 +48,7 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -587,11 +588,31 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Updating workload in cache")
}
// This forces you to go through the scheduler to update PodSetAssignments if there's a difference between the
// worker group podSetAssignments.Count of the old and new workload
updatePsa := compareAdmissionPodSetAssignmentCount(oldWl.Status.Admission, wlCopy.Status.Admission)
if features.Enabled(features.DynamicallySizedJobs) && updatePsa && !r.queues.UpdateWorkload(oldWl, wlCopy) {
log.V(2).Info("Queue for updated workload didn't exist; ignoring for now")
}
}

return true
}

func compareAdmissionPodSetAssignmentCount(oldWlAdmission *kueue.Admission, newWlAdmisson *kueue.Admission) bool {
// this is specific to RayClusters, it contains a PodSet of length 2, containing head and workers
oldWlPsa := len(oldWlAdmission.PodSetAssignments)
newWlPsa := len(newWlAdmisson.PodSetAssignments)
if oldWlPsa == 2 && oldWlPsa == newWlPsa {
if oldWlAdmission.PodSetAssignments[1].Name == newWlAdmisson.PodSetAssignments[1].Name {
if oldWlAdmission.PodSetAssignments[1].Count != newWlAdmisson.PodSetAssignments[1].Count {
return true
}
}
}
return false
}

func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool {
r.log.V(3).Info("Ignore generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind())
return false
Expand Down
25 changes: 21 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Expand Up @@ -348,6 +348,21 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

// 4.1 update podSetCount for RayCluster resize
if features.Enabled(features.DynamicallySizedJobs) && wl != nil && workload.IsAdmitted(wl) && job.GVK().Kind == "RayCluster" {
podSets := job.PodSets()
jobPodSetCount := podSets[1].Count
workloadPodSetCount := wl.Spec.PodSets[1].Count
if workloadPodSetCount > jobPodSetCount {
toUpdate := wl
_, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v")
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
}

// 5. handle WaitForPodsReady only for a standalone job.
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
Expand Down Expand Up @@ -574,7 +589,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}

if toUpdate != nil {
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate)
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated not matching Workload for suspended job: %v")
}

return match, nil
Expand Down Expand Up @@ -679,7 +694,9 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) {
jobPodSetCount := job.PodSets()
workloadPodSetCount := wl.Spec.PodSets[1].Count
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (features.Enabled(features.DynamicallySizedJobs) && job.GVK().Kind == "RayCluster" && jobPodSetCount[1].Count < workloadPodSetCount) {
return true
}
// If the workload is admitted but the job is suspended, do the check
Expand All @@ -692,7 +709,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, message string) (*kueue.Workload, error) {
newWl, err := r.constructWorkload(ctx, job, object)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
Expand All @@ -707,7 +724,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload,
"Updated not matching Workload for suspended job: %v", klog.KObj(wl))
message, klog.KObj(wl))
return newWl, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -83,6 +83,10 @@ const (
//
// Enables lending limit.
LendingLimit featuregate.Feature = "LendingLimit"
// owner: @vicenteferrara
// kep: <TODO>
// alpha: v0.7
DynamicallySizedJobs featuregate.Feature = "DynamicallySizedJobs"
)

func init() {
Expand All @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PrioritySortingWithinCohort: {Default: true, PreRelease: featuregate.Beta},
MultiKueue: {Default: false, PreRelease: featuregate.Alpha},
LendingLimit: {Default: false, PreRelease: featuregate.Alpha},
DynamicallySizedJobs: {Default: true, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down
65 changes: 63 additions & 2 deletions pkg/scheduler/scheduler.go
Expand Up @@ -258,6 +258,9 @@ func (s *Scheduler) schedule(ctx context.Context) {
s.cache.WaitForPodsReady(ctx)
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}
if e.status == assumed {
continue
}
e.status = nominated
if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
Expand Down Expand Up @@ -313,8 +316,21 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
ns := corev1.Namespace{}
e := entry{Info: w}
if s.cache.IsAssumedOrAdmittedWorkload(w) {
log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj))
continue
if features.Enabled(features.DynamicallySizedJobs) {
// here we want to get the flavors and resources assigned again so that we can update PodSetAssignments
e.assignment, e.preemptionTargets = s.getResizeAssignment(log, &e.Info, &snap)
e.inadmissibleMsg = e.assignment.Message()
e.Info.LastAssignment = &e.assignment.LastState
e.status = assumed
if err := s.updateResizePodSetAssignments(ctx, e); err != nil {
log.Error(err, "Could not apploy admission to assumed workload")
continue
}

} else {
log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj))
continue
}
} else if workload.HasRetryOrRejectedChecks(w.Obj) {
e.inadmissibleMsg = "The workload has failed admission checks"
} else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) {
Expand Down Expand Up @@ -377,6 +393,29 @@ type partialAssignment struct {
preemptionTargets []*workload.Info
}

func (s *Scheduler) getResizeAssignment(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors)
resizeAssignment := flvAssigner.Assign(log, nil)
var faPreemtionTargets []*workload.Info

arm := resizeAssignment.RepresentativeMode()
if arm == flavorassigner.Fit {
return resizeAssignment, nil
}

if arm == flavorassigner.Preempt {
faPreemtionTargets = s.preemptor.GetTargets(*wl, resizeAssignment, snap)
}

// if the feature gate is not enabled or we can preempt
if !features.Enabled(features.PartialAdmission) || len(faPreemtionTargets) > 0 {
return resizeAssignment, faPreemtionTargets
}

return resizeAssignment, nil
}

func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors)
Expand Down Expand Up @@ -477,6 +516,28 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e
return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
func (s *Scheduler) updateResizePodSetAssignments(ctx context.Context, e entry) error {
newWorkload := e.Obj.DeepCopy()
admission := &kueue.Admission{
ClusterQueue: kueue.ClusterQueueReference(e.ClusterQueue),
PodSetAssignments: e.assignment.ToAPI(),
}

workload.SetQuotaReservation(newWorkload, admission)
_ = workload.SyncAdmittedCondition(newWorkload)

if e.status == assumed {
// Apply admission means to update the workload with the new admission status, this is for the case of a scale down
// we shouldn't requeue a scale down we should only update the workload
return s.applyAdmission(ctx, newWorkload)
}

return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/raycluster/wrappers.go
Expand Up @@ -80,6 +80,12 @@ func (j *ClusterWrapper) NodeSelectorHeadGroup(k, v string) *ClusterWrapper {
return j
}

// Set replica count
func (j *ClusterWrapper) SetReplicaCount(c int32) *ClusterWrapper {
j.Spec.WorkerGroupSpecs[0].Replicas = ptr.To(c)
return j
}

// Obj returns the inner Job.
func (j *ClusterWrapper) Obj() *rayv1.RayCluster {
return &j.RayCluster
Expand Down
11 changes: 0 additions & 11 deletions pkg/webhooks/workload_webhook.go
Expand Up @@ -347,29 +347,18 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
allErrs = append(allErrs, ValidateWorkload(newObj)...)

if workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...)
}
if workload.HasQuotaReservation(newObj) && workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)
allErrs = append(allErrs, validateImmutablePodSetUpdates(newObj, oldObj, statusPath.Child("admissionChecks"))...)

return allErrs
}

// validateAdmissionUpdate validates that admission can be set or unset, but the
// fields within can't change.
func validateAdmissionUpdate(new, old *kueue.Admission, path *field.Path) field.ErrorList {
if old == nil || new == nil {
return nil
}
return apivalidation.ValidateImmutableField(new, old, path)
}

// validateReclaimablePodsUpdate validates that the reclaimable counts do not decrease, this should be checked
// while the workload is admitted.
func validateReclaimablePodsUpdate(newObj, oldObj *kueue.Workload, basePath *field.Path) field.ErrorList {
Expand Down
Expand Up @@ -205,7 +205,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed")
ginkgo.By("checking the job is suspended when parallelism increases and the added node selectors are removed")
parallelism := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newParallelism := parallelism + 1
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newParallelism
Expand Down Expand Up @@ -632,6 +632,62 @@ var _ = ginkgo.Describe("RayCluster Job controller interacting with scheduler",
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

ginkgo.It("Should not suspend job when there's a scale down", func() {
ginkgo.By("creating localQueue")
localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())

ginkgo.By("checking a dev job starts")
job := testingraycluster.MakeCluster("dev-job", ns.Name).SetReplicaCount(4).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "1").
RequestWorkerGroup(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
createdJob := &rayv1.RayCluster{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())
gomega.Expect(createdJob.Spec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
gomega.Expect(createdJob.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name))
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)

ginkgo.By("reduce the number of replicas, check the job is not suspended")
replicaCount := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newReplicaCount := replicaCount - 2
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newReplicaCount
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())

ginkgo.By("checking the workload is updated with new count")
createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadraycluster.GetWorkloadNameForRayCluster(job.Name, job.UID), Namespace: ns.Name}

gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return createdWorkload.Spec.PodSets[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return *createdWorkload.Status.Admission.PodSetAssignments[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

})

})

var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down

0 comments on commit daa8d26

Please sign in to comment.