Skip to content

Commit

Permalink
Do not make API calls when the workload already has condition Admitte…
Browse files Browse the repository at this point in the history
…d=false (kubernetes-sigs#1820)

Change-Id: Ib10c1874bfb3f5810f83d751e4d80ca6eb031bfa
  • Loading branch information
alculquicondor authored and vsoch committed Apr 18, 2024
1 parent a8ba152 commit 529ef5e
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 36 deletions.
36 changes: 18 additions & 18 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -217,25 +217,25 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if !r.queues.QueueForWorkloadExists(&wl) {
switch {
case !r.queues.QueueForWorkloadExists(&wl):
log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !cqOk {
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case !cqOk:
log.V(3).Info("Workload is inadmissible because of missing ClusterQueue", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !r.cache.ClusterQueueActive(cqName) {
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case !r.cache.ClusterQueueActive(cqName):
log.V(3).Info("Workload is inadmissible because ClusterQueue is inactive", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -293,13 +293,13 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont

if err != nil || !queue.DeletionTimestamp.IsZero() {
log.V(3).Info("Workload is inadmissible because the ClusterQueue is terminating or missing", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName))
_ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName))
return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
}

if queueStopPolicy != kueue.None {
log.V(3).Info("Workload is inadmissible because the ClusterQueue is stopped", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName))
_ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName))
return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/jobframework/reconciler.go
Expand Up @@ -372,8 +372,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if workload.HasQuotaReservation(wl) {
if !job.IsActive() {
log.V(6).Info("The job is no longer active, clear the workloads admission")
workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message)
_ = workload.SyncAdmittedCondition(wl)
_ = workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err != nil {
return ctrl.Result{}, fmt.Errorf("clearing admission: %w", err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/scheduler/scheduler.go
Expand Up @@ -586,10 +586,11 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)

if e.status == notNominated || e.status == skipped {
workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg)
err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true)
if err != nil {
log.Error(err, "Could not update Workload status")
if workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) {
err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true)
if err != nil {
log.Error(err, "Could not update Workload status")
}
}
s.recorder.Eventf(e.Obj, corev1.EventTypeNormal, "Pending", api.TruncateEventMessage(e.inadmissibleMsg))
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -2004,11 +2005,12 @@ func TestRequeueAndUpdate(t *testing.T) {
w1 := utiltesting.MakeWorkload("w1", "ns1").Queue(q1.Name).Obj()

cases := []struct {
name string
e entry
wantWorkloads map[string][]string
wantInadmissible map[string][]string
wantStatus kueue.WorkloadStatus
name string
e entry
wantWorkloads map[string][]string
wantInadmissible map[string][]string
wantStatus kueue.WorkloadStatus
wantStatusUpdates int
}{
{
name: "workload didn't fit",
Expand All @@ -2028,6 +2030,7 @@ func TestRequeueAndUpdate(t *testing.T) {
wantInadmissible: map[string][]string{
"cq": {workload.Key(w1)},
},
wantStatusUpdates: 1,
},
{
name: "assumed",
Expand Down Expand Up @@ -2068,6 +2071,7 @@ func TestRequeueAndUpdate(t *testing.T) {
wantWorkloads: map[string][]string{
"cq": {workload.Key(w1)},
},
wantStatusUpdates: 1,
},
}

Expand All @@ -2076,8 +2080,14 @@ func TestRequeueAndUpdate(t *testing.T) {
ctx, log := utiltesting.ContextWithLog(t)
scheme := runtime.NewScheme()

updates := 0
objs := []client.Object{w1, q1, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns1"}}}
cl := utiltesting.NewFakeClientSSAAsSM(objs...)
cl := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
SubResourcePatch: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
updates++
return utiltesting.TreatSSAAsStrategicMerge(ctx, client, subResourceName, obj, patch, opts...)
},
}).WithObjects(objs...).WithStatusSubresource(objs...).Build()
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName})
cqCache := cache.New(cl)
Expand Down Expand Up @@ -2120,6 +2130,11 @@ func TestRequeueAndUpdate(t *testing.T) {
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, ignoreConditionTimestamps); diff != "" {
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
}
// Make sure a second call doesn't make unnecessary updates.
scheduler.requeueAndUpdate(log, ctx, tc.e)
if updates != tc.wantStatusUpdates {
t.Errorf("Observed %d status updates, want %d", updates, tc.wantStatusUpdates)
}
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/workload/admissionchecks.go
Expand Up @@ -58,8 +58,7 @@ func SyncAdmittedCondition(w *kueue.Workload) bool {
newCondition.Message = "The workload has not all checks ready"
}

apimeta.SetStatusCondition(&w.Status.Conditions, newCondition)
return true
return apimeta.SetStatusCondition(&w.Status.Conditions, newCondition)
}

// FindAdmissionCheck - returns a pointer to the check identified by checkName if found in checks.
Expand Down
17 changes: 13 additions & 4 deletions pkg/workload/workload.go
Expand Up @@ -318,19 +318,28 @@ func UpdateStatus(ctx context.Context,
return c.Status().Patch(ctx, newWl, client.Apply, client.FieldOwner(managerPrefix+"-"+condition.Type))
}

func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) {
// UnsetQuotaReservationWithCondition sets the QuotaReserved condition to false and clears
// the admission.
// Returns whether any change was done.
func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) bool {
condition := metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: api.TruncateConditionMessage(message),
}
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
wl.Status.Admission = nil
changed := apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
if wl.Status.Admission != nil {
wl.Status.Admission = nil
changed = true
}

// Reset the admitted condition if necessary.
_ = SyncAdmittedCondition(wl)
if SyncAdmittedCondition(wl) {
changed = true
}
return changed
}

// BaseSSAWorkload creates a new object based on the input workload that
Expand Down

0 comments on commit 529ef5e

Please sign in to comment.