From c502f99d60607ea82fd6385337d23782332a6596 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 11 Mar 2024 18:22:28 +0000 Subject: [PATCH] Do not make API calls when the workload already has condition Admitted=false Change-Id: Ib10c1874bfb3f5810f83d751e4d80ca6eb031bfa --- pkg/controller/core/workload_controller.go | 36 +++++++++++----------- pkg/controller/jobframework/reconciler.go | 3 +- pkg/scheduler/scheduler.go | 9 +++--- pkg/scheduler/scheduler_test.go | 27 ++++++++++++---- pkg/workload/admissionchecks.go | 3 +- pkg/workload/workload.go | 17 +++++++--- 6 files changed, 59 insertions(+), 36 deletions(-) diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index ed6c17843f..3603faa6e6 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -210,25 +210,25 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - if !r.queues.QueueForWorkloadExists(&wl) { + switch { + case !r.queues.QueueForWorkloadExists(&wl): log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName)) - workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) - err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - if !cqOk { + if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) { + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + return ctrl.Result{}, client.IgnoreNotFound(err) + } + case !cqOk: log.V(3).Info("Workload is inadmissible because of missing ClusterQueue", "clusterQueue", klog.KRef("", cqName)) - workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) - err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - if !r.cache.ClusterQueueActive(cqName) { + if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) { + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + return ctrl.Result{}, client.IgnoreNotFound(err) + } + case !r.cache.ClusterQueueActive(cqName): log.V(3).Info("Workload is inadmissible because ClusterQueue is inactive", "clusterQueue", klog.KRef("", cqName)) - workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName)) - err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) - return ctrl.Result{}, client.IgnoreNotFound(err) + if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName)) { + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) + return ctrl.Result{}, client.IgnoreNotFound(err) + } } return ctrl.Result{}, nil @@ -286,13 +286,13 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont if err != nil || !queue.DeletionTimestamp.IsZero() { log.V(3).Info("Workload is inadmissible because the ClusterQueue is terminating or missing", "clusterQueue", klog.KRef("", cqName)) - workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName)) return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) } if queueStopPolicy != kueue.None { log.V(3).Info("Workload is inadmissible because the ClusterQueue is stopped", "clusterQueue", klog.KRef("", cqName)) - workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName)) + _ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName)) return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true) } diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 56c1e20c69..9e637b2588 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -374,8 +374,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if workload.HasQuotaReservation(wl) { if !job.IsActive() { log.V(6).Info("The job is no longer active, clear the workloads admission") - workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message) - _ = workload.SyncAdmittedCondition(wl) + _ = workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message) err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) if err != nil { return ctrl.Result{}, fmt.Errorf("clearing admission: %w", err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index df184b1e29..9d61ad2ea5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -586,10 +586,11 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added) if e.status == notNominated || e.status == skipped { - workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) - err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true) - if err != nil { - log.Error(err, "Could not update Workload status") + if workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) { + err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true) + if err != nil { + log.Error(err, "Could not update Workload status") + } } s.recorder.Eventf(e.Obj, corev1.EventTypeNormal, "Pending", api.TruncateEventMessage(e.inadmissibleMsg)) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 5260a63def..202766df5d 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" @@ -2004,11 +2005,12 @@ func TestRequeueAndUpdate(t *testing.T) { w1 := utiltesting.MakeWorkload("w1", "ns1").Queue(q1.Name).Obj() cases := []struct { - name string - e entry - wantWorkloads map[string][]string - wantInadmissible map[string][]string - wantStatus kueue.WorkloadStatus + name string + e entry + wantWorkloads map[string][]string + wantInadmissible map[string][]string + wantStatus kueue.WorkloadStatus + wantStatusUpdates int }{ { name: "workload didn't fit", @@ -2028,6 +2030,7 @@ func TestRequeueAndUpdate(t *testing.T) { wantInadmissible: map[string][]string{ "cq": {workload.Key(w1)}, }, + wantStatusUpdates: 1, }, { name: "assumed", @@ -2068,6 +2071,7 @@ func TestRequeueAndUpdate(t *testing.T) { wantWorkloads: map[string][]string{ "cq": {workload.Key(w1)}, }, + wantStatusUpdates: 1, }, } @@ -2076,8 +2080,14 @@ func TestRequeueAndUpdate(t *testing.T) { ctx, log := utiltesting.ContextWithLog(t) scheme := runtime.NewScheme() + updates := 0 objs := []client.Object{w1, q1, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns1"}}} - cl := utiltesting.NewFakeClientSSAAsSM(objs...) + cl := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{ + SubResourcePatch: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + updates++ + return utiltesting.TreatSSAAsStrategicMerge(ctx, client, subResourceName, obj, patch, opts...) + }, + }).WithObjects(objs...).WithStatusSubresource(objs...).Build() broadcaster := record.NewBroadcaster() recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName}) cqCache := cache.New(cl) @@ -2120,6 +2130,11 @@ func TestRequeueAndUpdate(t *testing.T) { if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, ignoreConditionTimestamps); diff != "" { t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff) } + // Make sure a second call doesn't make unnecessary updates. + scheduler.requeueAndUpdate(log, ctx, tc.e) + if updates != tc.wantStatusUpdates { + t.Errorf("Observed %d status updates, want %d", updates, tc.wantStatusUpdates) + } }) } } diff --git a/pkg/workload/admissionchecks.go b/pkg/workload/admissionchecks.go index b1d3ebcb3d..e831144681 100644 --- a/pkg/workload/admissionchecks.go +++ b/pkg/workload/admissionchecks.go @@ -58,8 +58,7 @@ func SyncAdmittedCondition(w *kueue.Workload) bool { newCondition.Message = "The workload has not all checks ready" } - apimeta.SetStatusCondition(&w.Status.Conditions, newCondition) - return true + return apimeta.SetStatusCondition(&w.Status.Conditions, newCondition) } // FindAdmissionCheck - returns a pointer to the check identified by checkName if found in checks. diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 13faead6d6..8ad0f571ad 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -318,7 +318,10 @@ func UpdateStatus(ctx context.Context, return c.Status().Patch(ctx, newWl, client.Apply, client.FieldOwner(managerPrefix+"-"+condition.Type)) } -func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) { +// UnsetQuotaReservationWithCondition sets the QuotaReserved condition to false and clears +// the admission. +// Returns whether any change was done. +func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) bool { condition := metav1.Condition{ Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionFalse, @@ -326,11 +329,17 @@ func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message stri Reason: reason, Message: api.TruncateConditionMessage(message), } - apimeta.SetStatusCondition(&wl.Status.Conditions, condition) - wl.Status.Admission = nil + changed := apimeta.SetStatusCondition(&wl.Status.Conditions, condition) + if wl.Status.Admission != nil { + wl.Status.Admission = nil + changed = true + } // Reset the admitted condition if necessary. - _ = SyncAdmittedCondition(wl) + if SyncAdmittedCondition(wl) { + changed = true + } + return changed } // BaseSSAWorkload creates a new object based on the input workload that