Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not make API calls when the workload already has condition Admitted=false #1820

Merged
merged 1 commit into from Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -210,25 +210,25 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

if !r.queues.QueueForWorkloadExists(&wl) {
switch {
case !r.queues.QueueForWorkloadExists(&wl):
log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !cqOk {
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case !cqOk:
log.V(3).Info("Workload is inadmissible because of missing ClusterQueue", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !r.cache.ClusterQueueActive(cqName) {
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
case !r.cache.ClusterQueueActive(cqName):
log.V(3).Info("Workload is inadmissible because ClusterQueue is inactive", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName)) {
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -286,13 +286,13 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont

if err != nil || !queue.DeletionTimestamp.IsZero() {
log.V(3).Info("Workload is inadmissible because the ClusterQueue is terminating or missing", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName))
_ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is terminating or missing", cqName))
return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
}

if queueStopPolicy != kueue.None {
log.V(3).Info("Workload is inadmissible because the ClusterQueue is stopped", "clusterQueue", klog.KRef("", cqName))
workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName))
_ = workload.UnsetQuotaReservationWithCondition(wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is stopped", cqName))
return true, workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/jobframework/reconciler.go
Expand Up @@ -374,8 +374,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
if workload.HasQuotaReservation(wl) {
if !job.IsActive() {
log.V(6).Info("The job is no longer active, clear the workloads admission")
workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message)
_ = workload.SyncAdmittedCondition(wl)
_ = workload.UnsetQuotaReservationWithCondition(wl, "Pending", evCond.Message)
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err != nil {
return ctrl.Result{}, fmt.Errorf("clearing admission: %w", err)
Expand Down
9 changes: 5 additions & 4 deletions pkg/scheduler/scheduler.go
Expand Up @@ -586,10 +586,11 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)

if e.status == notNominated || e.status == skipped {
workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg)
err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true)
if err != nil {
log.Error(err, "Could not update Workload status")
if workload.UnsetQuotaReservationWithCondition(e.Obj, "Pending", e.inadmissibleMsg) {
err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true)
if err != nil {
log.Error(err, "Could not update Workload status")
}
}
s.recorder.Eventf(e.Obj, corev1.EventTypeNormal, "Pending", api.TruncateEventMessage(e.inadmissibleMsg))
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/scheduler/scheduler_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand Down Expand Up @@ -2004,11 +2005,12 @@ func TestRequeueAndUpdate(t *testing.T) {
w1 := utiltesting.MakeWorkload("w1", "ns1").Queue(q1.Name).Obj()

cases := []struct {
name string
e entry
wantWorkloads map[string][]string
wantInadmissible map[string][]string
wantStatus kueue.WorkloadStatus
name string
e entry
wantWorkloads map[string][]string
wantInadmissible map[string][]string
wantStatus kueue.WorkloadStatus
wantStatusUpdates int
}{
{
name: "workload didn't fit",
Expand All @@ -2028,6 +2030,7 @@ func TestRequeueAndUpdate(t *testing.T) {
wantInadmissible: map[string][]string{
"cq": {workload.Key(w1)},
},
wantStatusUpdates: 1,
},
{
name: "assumed",
Expand Down Expand Up @@ -2068,6 +2071,7 @@ func TestRequeueAndUpdate(t *testing.T) {
wantWorkloads: map[string][]string{
"cq": {workload.Key(w1)},
},
wantStatusUpdates: 1,
},
}

Expand All @@ -2076,8 +2080,14 @@ func TestRequeueAndUpdate(t *testing.T) {
ctx, log := utiltesting.ContextWithLog(t)
scheme := runtime.NewScheme()

updates := 0
objs := []client.Object{w1, q1, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns1"}}}
cl := utiltesting.NewFakeClientSSAAsSM(objs...)
cl := utiltesting.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
SubResourcePatch: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
updates++
return utiltesting.TreatSSAAsStrategicMerge(ctx, client, subResourceName, obj, patch, opts...)
},
}).WithObjects(objs...).WithStatusSubresource(objs...).Build()
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: constants.AdmissionName})
cqCache := cache.New(cl)
Expand Down Expand Up @@ -2120,6 +2130,11 @@ func TestRequeueAndUpdate(t *testing.T) {
if diff := cmp.Diff(tc.wantStatus, updatedWl.Status, ignoreConditionTimestamps); diff != "" {
t.Errorf("Unexpected status after updating (-want,+got):\n%s", diff)
}
// Make sure a second call doesn't make unnecessary updates.
scheduler.requeueAndUpdate(log, ctx, tc.e)
if updates != tc.wantStatusUpdates {
t.Errorf("Observed %d status updates, want %d", updates, tc.wantStatusUpdates)
}
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/workload/admissionchecks.go
Expand Up @@ -58,8 +58,7 @@ func SyncAdmittedCondition(w *kueue.Workload) bool {
newCondition.Message = "The workload has not all checks ready"
}

apimeta.SetStatusCondition(&w.Status.Conditions, newCondition)
return true
return apimeta.SetStatusCondition(&w.Status.Conditions, newCondition)
}

// FindAdmissionCheck - returns a pointer to the check identified by checkName if found in checks.
Expand Down
17 changes: 13 additions & 4 deletions pkg/workload/workload.go
Expand Up @@ -318,19 +318,28 @@ func UpdateStatus(ctx context.Context,
return c.Status().Patch(ctx, newWl, client.Apply, client.FieldOwner(managerPrefix+"-"+condition.Type))
}

func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) {
// UnsetQuotaReservationWithCondition sets the QuotaReserved condition to false and clears
// the admission.
// Returns whether any change was done.
func UnsetQuotaReservationWithCondition(wl *kueue.Workload, reason, message string) bool {
condition := metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: api.TruncateConditionMessage(message),
}
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
wl.Status.Admission = nil
changed := apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
if wl.Status.Admission != nil {
wl.Status.Admission = nil
changed = true
}

// Reset the admitted condition if necessary.
_ = SyncAdmittedCondition(wl)
if SyncAdmittedCondition(wl) {
changed = true
}
return changed
}

// BaseSSAWorkload creates a new object based on the input workload that
Expand Down