Skip to content

Commit

Permalink
scale down
Browse files Browse the repository at this point in the history
n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

debuggin

n

m

patch error field not declared in schema

clean code

n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

a

cluster queue reconciliation fixed, it had to do with the infot totalrequests from admission
inside the worklad go file

working with scheduler

cleaning code

cleaning code

cleaning

cleaning

cleaning

integation test, but it messes up with parallelism test which should be expected

updated parallelism it test

updated wrappers

kep

removed Kep

removed log lines

clean code

added a better conditional for updating the resize if the job is a RayCluster

added Kind condition

updated test and equivalentToWorkload condition

added podset assigments check

updated feature gate

updated feature gate

updating equivalentWorkload

fixed lint

removed changes from scheduler and workload controller

testing

updated workload controller reconciler to update spec and status

nit

update feature gate

update variables

made code more generic

updated workload controller helper method

typo

nit

addressed comments

updated workload controller to use unuused quota
  • Loading branch information
vicentefb committed Apr 24, 2024
1 parent 9890f41 commit def03e9
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 8 deletions.
57 changes: 56 additions & 1 deletion pkg/controller/core/workload_controller.go
Expand Up @@ -51,9 +51,11 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/limitrange"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -197,6 +199,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil {
return ctrl.Result{}, err
}
if features.Enabled(features.DynamicallySizedJobs) {
if err := r.downSizeJobIfNecessary(&wl, ctx); err != nil {
return ctrl.Result{}, err
}

}

return r.reconcileNotReadyTimeout(ctx, req, &wl)
}
Expand Down Expand Up @@ -247,6 +255,52 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

func (r *WorkloadReconciler) isScaledDown(wl *kueue.Workload) bool {
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) > wl.Spec.PodSets[i].Count {
return true
}
}
return false
}

func (r *WorkloadReconciler) downSizeJobIfNecessary(wl *kueue.Workload, ctx context.Context) error {
statusUpdate := false
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) > wl.Spec.PodSets[i].Count {
// Get Resource Requests and Usage values
originalResourceRequests := limitrange.TotalRequests(&wl.Spec.PodSets[i].Template.Spec)
currentAssignedResourceUsage := wl.Status.Admission.PodSetAssignments[i].ResourceUsage

diff := ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) - wl.Spec.PodSets[i].Count
for k := range currentAssignedResourceUsage {
resourceQuantity := originalResourceRequests[k]
resourceQuantity.Mul(int64(diff))
originalResourceRequests[k] = resourceQuantity

assignedResourceQuantity := currentAssignedResourceUsage[k]
assignedResourceQuantity.Sub(originalResourceRequests[k])
currentAssignedResourceUsage[k] = assignedResourceQuantity
}

wl.Status.Admission.PodSetAssignments[i].Count = ptr.To(wl.Spec.PodSets[i].Count)
wl.Status.Admission.PodSetAssignments[i].ResourceUsage = currentAssignedResourceUsage

statusUpdate = true
}
}
if statusUpdate {
// Update Status
workload.SyncAdmittedCondition(wl)
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return err
}
}
return nil
}

func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || !workload.HasRetryOrRejectedChecks(wl) {
return false, nil
Expand Down Expand Up @@ -484,6 +538,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {

func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
oldWl, isWorkload := e.ObjectOld.(*kueue.Workload)

if !isWorkload {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
Expand Down Expand Up @@ -576,7 +631,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
}
})
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods) || r.isScaledDown(oldWl):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Update the workload from cache while holding the queues lock
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/jobframework/interface.go
Expand Up @@ -63,6 +63,12 @@ type JobWithReclaimablePods interface {
ReclaimablePods() ([]kueue.ReclaimablePod, error)
}

type ReizableJobs interface {
// IsResizable returns the true/false depending if the job is being downsized.
// It only supports downsizing for now, see KEP:77
IsResizable(wl *kueue.Workload) bool
}

type StopReason int

const (
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Expand Up @@ -356,6 +356,17 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

// 4.1 update podSetCount for RayCluster resize (downsize)
if jobSizeable, implementsSizable := job.(ReizableJobs); implementsSizable && jobSizeable.IsResizable(wl) && workload.IsAdmitted(wl) {
toUpdate := wl
_, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v")
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil

}

// 5. handle WaitForPodsReady only for a standalone job.
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
Expand Down Expand Up @@ -582,7 +593,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}

if toUpdate != nil {
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate)
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated not matching Workload for suspended job: %v")
}

return match, nil
Expand Down Expand Up @@ -687,7 +698,8 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) {
jobSizeable, implementsSizable := job.(ReizableJobs)
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (implementsSizable && jobSizeable.IsResizable(wl)) {
return true
}
// If the workload is admitted but the job is suspended, do the check
Expand All @@ -700,7 +712,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, message string) (*kueue.Workload, error) {
newWl, err := r.constructWorkload(ctx, job, object)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
Expand All @@ -715,7 +727,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload,
"Updated not matching Workload for suspended job: %v", klog.KObj(wl))
message, klog.KObj(wl))
return newWl, nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Expand Up @@ -28,6 +28,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
)

Expand Down Expand Up @@ -87,6 +88,18 @@ func (j *RayCluster) GVK() schema.GroupVersionKind {
return gvk
}

func (j *RayCluster) IsResizable(wl *kueue.Workload) bool {
if features.Enabled(features.DynamicallySizedJobs) {
pods := j.PodSets()
for i := 1; i < len(pods); i++ {
if wl.Spec.PodSets[i].Count > pods[i].Count {
return true
}
}
}
return false
}

func (j *RayCluster) PodSets() []kueue.PodSet {
// len = workerGroups + head
podSets := make([]kueue.PodSet, len(j.Spec.WorkerGroupSpecs)+1)
Expand Down
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -83,6 +83,10 @@ const (
//
// Enables lending limit.
LendingLimit featuregate.Feature = "LendingLimit"
// owner: @vicenteferrara
// kep: https://github.com/kubernetes-sigs/kueue/tree/3347afb5a8681f4fd2a5f3b1a5be8c5c0ebac488/keps/77-dynamically-sized-jobs
// alpha: v0.8
DynamicallySizedJobs featuregate.Feature = "DynamicallySizedJobs"
)

func init() {
Expand All @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PrioritySortingWithinCohort: {Default: true, PreRelease: featuregate.Beta},
MultiKueue: {Default: false, PreRelease: featuregate.Alpha},
LendingLimit: {Default: false, PreRelease: featuregate.Alpha},
DynamicallySizedJobs: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/raycluster/wrappers.go
Expand Up @@ -80,6 +80,12 @@ func (j *ClusterWrapper) NodeSelectorHeadGroup(k, v string) *ClusterWrapper {
return j
}

// Set replica count
func (j *ClusterWrapper) WorkerCount(c int32) *ClusterWrapper {
j.Spec.WorkerGroupSpecs[0].Replicas = ptr.To(c)
return j
}

// Obj returns the inner Job.
func (j *ClusterWrapper) Obj() *rayv1.RayCluster {
return &j.RayCluster
Expand Down
8 changes: 6 additions & 2 deletions pkg/webhooks/workload_webhook.go
Expand Up @@ -347,15 +347,19 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
allErrs = append(allErrs, ValidateWorkload(newObj)...)

if workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
if !features.Enabled(features.DynamicallySizedJobs) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
}
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...)
}
if workload.HasQuotaReservation(newObj) && workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)
if !features.Enabled(features.DynamicallySizedJobs) {
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)
}
allErrs = append(allErrs, validateImmutablePodSetUpdates(newObj, oldObj, statusPath.Child("admissionChecks"))...)

return allErrs
Expand Down
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
workloadraycluster "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" // to enable the framework
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/testing"
testingraycluster "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
Expand Down Expand Up @@ -200,7 +201,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed")
ginkgo.By("checking the job is suspended when parallelism increases and the added node selectors are removed")
parallelism := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newParallelism := parallelism + 1
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newParallelism
Expand Down Expand Up @@ -627,6 +628,97 @@ var _ = ginkgo.Describe("RayCluster Job controller interacting with scheduler",
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

gomega.Eventually(func() bool {
if err := features.SetEnable(features.DynamicallySizedJobs, true); err != nil {
return false
}
return features.Enabled(features.DynamicallySizedJobs)
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.It("Should not suspend job when there's a scale down", func() {
ginkgo.By("creating localQueue")
localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())

ginkgo.By("checking a dev job starts")
job := testingraycluster.MakeCluster("dev-job", ns.Name).WorkerCount(4).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "1").
RequestWorkerGroup(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
createdJob := &rayv1.RayCluster{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())
gomega.Expect(createdJob.Spec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
gomega.Expect(createdJob.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name))
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)

ginkgo.By("checking a second no-fit RayCluster does not start")
job2 := testingraycluster.MakeCluster("job2", ns.Name).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "3").
RequestWorkerGroup(corev1.ResourceCPU, "4").
Obj()
gomega.Expect(k8sClient.Create(ctx, job2)).Should(gomega.Succeed())
createdJob2 := &rayv1.RayCluster{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job2.Name, Namespace: job2.Namespace}, createdJob2)).
Should(gomega.Succeed())
return *createdJob2.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)

ginkgo.By("reduce the number of replicas, check the job is not suspended")
replicaCount := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newReplicaCount := replicaCount - 2
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newReplicaCount
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())

ginkgo.By("checking the workload is updated with new count")
createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadraycluster.GetWorkloadNameForRayCluster(job.Name, job.UID), Namespace: ns.Name}

gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return createdWorkload.Spec.PodSets[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return *createdWorkload.Status.Admission.PodSetAssignments[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

/*
ginkgo.By("checking the second RayCluster starts when the first one was deleted")
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job2.Name, Namespace: job2.Namespace}, createdJob2)).
Should(gomega.Succeed())
return *createdJob2.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())
gomega.Expect(createdJob2.Spec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
gomega.Expect(createdJob2.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
*/

})

})

var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down

0 comments on commit def03e9

Please sign in to comment.