Skip to content

Commit

Permalink
scale down
Browse files Browse the repository at this point in the history
n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

debuggin

n

m

patch error field not declared in schema

clean code

n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

a

cluster queue reconciliation fixed, it had to do with the infot totalrequests from admission
inside the worklad go file

working with scheduler

cleaning code

cleaning code

cleaning

cleaning

cleaning

integation test, but it messes up with parallelism test which should be expected

updated parallelism it test

updated wrappers

kep

removed Kep

removed log lines

clean code

added a better conditional for updating the resize if the job is a RayCluster

added Kind condition

updated test and equivalentToWorkload condition

added podset assigments check

updated feature gate

updated feature gate

updating equivalentWorkload

fixed lint

removed changes from scheduler and workload controller

testing

updated workload controller reconciler to update spec and status

nit

update feature gate

update variables

made code more generic

updated workload controller helper method

typo

nit

addressed comments

updated workload controller to use unuused quota

updated integration test to work

added unit test in workload controller

changed naming to resizeable and fixed lint

nit

addressed comments
  • Loading branch information
vicentefb committed Apr 30, 2024
1 parent 5401a3b commit fff664c
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 9 deletions.
62 changes: 61 additions & 1 deletion pkg/controller/core/workload_controller.go
Expand Up @@ -51,9 +51,11 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/limitrange"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -198,6 +200,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil {
return ctrl.Result{}, err
}
if features.Enabled(features.ResizableJobs) {
if err := r.downSizeJobIfNecessary(&wl, ctx); err != nil {
return ctrl.Result{}, err
}

}

return r.reconcileNotReadyTimeout(ctx, req, &wl)
}
Expand Down Expand Up @@ -248,6 +256,57 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

func (r *WorkloadReconciler) isScaledDown(wl *kueue.Workload) bool {
if !features.Enabled(features.ResizableJobs) {
return false
}
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) > wl.Spec.PodSets[i].Count {
return true
}
}
return false
}

func (r *WorkloadReconciler) downSizeJobIfNecessary(wl *kueue.Workload, ctx context.Context) error {
updateStatus := false
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) <= wl.Spec.PodSets[i].Count {
continue
}
// Get Resource Requests and Usage values
originalResourceRequests := limitrange.TotalRequests(&wl.Spec.PodSets[i].Template.Spec)
currentAssignedResourceUsage := wl.Status.Admission.PodSetAssignments[i].ResourceUsage

diff := ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) - wl.Spec.PodSets[i].Count
for k := range currentAssignedResourceUsage {
resourceQuantity := originalResourceRequests[k]
resourceQuantity.Mul(int64(diff))
originalResourceRequests[k] = resourceQuantity

assignedResourceQuantity := currentAssignedResourceUsage[k]
assignedResourceQuantity.Sub(originalResourceRequests[k])
currentAssignedResourceUsage[k] = assignedResourceQuantity
}

wl.Status.Admission.PodSetAssignments[i].Count = ptr.To(wl.Spec.PodSets[i].Count)
wl.Status.Admission.PodSetAssignments[i].ResourceUsage = currentAssignedResourceUsage

updateStatus = true

}
if updateStatus {
// Update Status
workload.SyncAdmittedCondition(wl)
if err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true); err != nil {
return err
}
}
return nil
}

func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || !workload.HasRetryOrRejectedChecks(wl) {
return false, nil
Expand Down Expand Up @@ -485,6 +544,7 @@ func (r *WorkloadReconciler) Delete(e event.DeleteEvent) bool {

func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
oldWl, isWorkload := e.ObjectOld.(*kueue.Workload)

if !isWorkload {
// this event will be handled by the LimitRange/RuntimeClass handle
return true
Expand Down Expand Up @@ -582,7 +642,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
}
})
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods) || r.isScaledDown(oldWl):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Update the workload from cache while holding the queues lock
Expand Down
126 changes: 125 additions & 1 deletion pkg/controller/core/workload_controller_test.go
Expand Up @@ -26,6 +26,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -37,6 +38,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)
Expand Down Expand Up @@ -431,6 +433,126 @@ func TestReconcile(t *testing.T) {
Obj(),
wantWorkload: nil,
},
"admit - Scale Down - difference between PodSet.Count and PodSetAssignments[i].Count": {
workload: utiltesting.MakeWorkload("a", "ns").
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
kueue.PodSet{
Name: "head",
Count: int32(1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{

RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "head-container",
Resources: v1.ResourceRequirements{
Requests: make(v1.ResourceList),
},
},
},
},
},
},
kueue.PodSet{
Name: "workers-group-0",
Count: int32(3),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,

Containers: []v1.Container{
{
Name: "worker-container",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
},
},
},
},
},
}).
Request(v1.ResourceCPU, "1").
ReserveQuota(
utiltesting.MakeAdmissionScaleDown(v1.ResourceCPU, "10", int32(4), "cq", "head", "workers-group-0").
Assignment(v1.ResourceCPU, "unit-test-flavor", "1").
AssignmentPodCount(1).
Obj(),
).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "head",
},
{
Name: "workers-group-0",
},
},
}).
Obj(),
wantWorkload: utiltesting.MakeWorkload("a", "ns").
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
kueue.PodSet{
Name: "head",
Count: int32(1),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "head-container",
Resources: v1.ResourceRequirements{
Requests: make(v1.ResourceList),
},
},
},
},
},
},
kueue.PodSet{
Name: "workers-group-0",
Count: int32(3),
Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
{
Name: "worker-container",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
},
},
},
},
},
}).
Request(v1.ResourceCPU, "1").
ReserveQuota(
utiltesting.MakeAdmissionScaleDown(v1.ResourceCPU, "9", int32(3), "cq", "head", "workers-group-0").
Assignment(v1.ResourceCPU, "unit-test-flavor", "1").
AssignmentPodCount(1).
Obj(),
).
Admitted(true).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "head",
},
{
Name: "workers-group-0",
},
},
}).
Obj(),
},
"don't remove finalizer for owned finished workload": {
workload: utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Condition(metav1.Condition{
Expand Down Expand Up @@ -619,7 +741,9 @@ func TestReconcile(t *testing.T) {
t.Errorf("couldn't add the local queue to the cache: %v", err)
}
}

if err := features.SetEnable(features.ResizableJobs, true); err != nil {
t.Errorf("couldn't enable Dynamic Jobs feature: %v", err)
}
_, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)})

if diff := cmp.Diff(tc.wantError, gotError); diff != "" {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/jobframework/interface.go
Expand Up @@ -62,6 +62,12 @@ type JobWithReclaimablePods interface {
ReclaimablePods() ([]kueue.ReclaimablePod, error)
}

type ReizableJobs interface {
// IsResizable returns the true/false depending if the job is being downsized.
// It only supports downsizing for now, see KEP:77
IsResizable(wl *kueue.Workload) bool
}

type StopReason int

const (
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Expand Up @@ -360,6 +360,17 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

// 4.1 update podSetCount for RayCluster resize (downsize)
if jobSizeable, implementsSizable := job.(ReizableJobs); implementsSizable && jobSizeable.IsResizable(wl) && workload.IsAdmitted(wl) {
toUpdate := wl
_, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v")
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil

}

// 5. handle WaitForPodsReady only for a standalone job.
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
Expand Down Expand Up @@ -586,7 +597,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}

if toUpdate != nil {
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate)
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated not matching Workload for suspended job: %v")
}

return match, nil
Expand Down Expand Up @@ -691,7 +702,8 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) {
jobSizeable, implementsSizable := job.(ReizableJobs)
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (implementsSizable && jobSizeable.IsResizable(wl)) {
return true
}
// If the workload is admitted but the job is suspended, do the check
Expand All @@ -704,7 +716,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, message string) (*kueue.Workload, error) {
newWl, err := r.constructWorkload(ctx, job, object)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
Expand All @@ -719,7 +731,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload,
"Updated not matching Workload for suspended job: %v", klog.KObj(wl))
message, klog.KObj(wl))
return newWl, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Expand Up @@ -28,6 +28,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
)

Expand Down Expand Up @@ -87,6 +88,20 @@ func (j *RayCluster) GVK() schema.GroupVersionKind {
return gvk
}

func (j *RayCluster) IsResizable(wl *kueue.Workload) bool {
if !features.Enabled(features.ResizableJobs) {
return false
}
pods := j.PodSets()
for i := 1; i < len(pods); i++ {
if wl.Spec.PodSets[i].Count > pods[i].Count {
return true
}
}

return false
}

func (j *RayCluster) PodSets() []kueue.PodSet {
// len = workerGroups + head
podSets := make([]kueue.PodSet, len(j.Spec.WorkerGroupSpecs)+1)
Expand Down
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -83,6 +83,10 @@ const (
//
// Enables lending limit.
LendingLimit featuregate.Feature = "LendingLimit"
// owner: @vicenteferrara
// kep: https://github.com/kubernetes-sigs/kueue/tree/3347afb5a8681f4fd2a5f3b1a5be8c5c0ebac488/keps/77-dynamically-sized-jobs
// alpha: v0.8
ResizableJobs featuregate.Feature = "ResizableJobs"
)

func init() {
Expand All @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PrioritySortingWithinCohort: {Default: true, PreRelease: featuregate.Beta},
MultiKueue: {Default: false, PreRelease: featuregate.Alpha},
LendingLimit: {Default: false, PreRelease: featuregate.Alpha},
ResizableJobs: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down

0 comments on commit fff664c

Please sign in to comment.