Skip to content

Commit

Permalink
Avoid requeue on ClusterQueue status update (#1822)
Browse files Browse the repository at this point in the history
* Avoid requeue on ClusterQueue status update

Change-Id: Id028414a730a238549b7fddf5d42a2eafdf959c3

* Reclaimable pods can put pod out of inadmissible

Change-Id: Id52ef3c92abd84317c4a7a71c3a20f6f28d67d25

* Delay requeueing based on RequeueState

Change-Id: I5431e741cb30541a846b65fb26aa0b7b631eb80d
  • Loading branch information
alculquicondor committed Mar 12, 2024
1 parent 604dc07 commit d755fb9
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 10 deletions.
3 changes: 2 additions & 1 deletion pkg/controller/core/clusterqueue_controller.go
Expand Up @@ -302,11 +302,12 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
return true
}
defer r.notifyWatchers(oldCq, newCq)
specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec)

if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq, specUpdated); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}

Expand Down
34 changes: 32 additions & 2 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -541,9 +542,38 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
})
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
var backoff time.Duration
if wlCopy.Status.RequeueState != nil && wlCopy.Status.RequeueState.RequeueAt != nil {
backoff = time.Until(wl.Status.RequeueState.RequeueAt.Time)
}
if backoff <= 0 {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
} else {
log.V(3).Info("Workload to be requeued after backoff", "backoff", backoff, "requeueAt", wl.Status.RequeueState.RequeueAt.Time)
time.AfterFunc(backoff, func() {
updatedWl := kueue.Workload{}
err := r.client.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWl)
if err == nil && workloadStatus(&updatedWl) == pending {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
} else {
log.V(3).Info("Workload requeued after backoff")
}
}
})
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Update the workload from cache while holding the queues lock
// to guarantee that requeued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})

default:
// Workload update in the cache is handled here; however, some fields are immutable
Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/metrics.go
Expand Up @@ -55,7 +55,7 @@ const (
var (
CQStatuses = []ClusterQueueStatus{CQStatusPending, CQStatusActive, CQStatusTerminating}

admissionAttemptsTotal = prometheus.NewCounterVec(
AdmissionAttemptsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "admission_attempts_total",
Expand Down Expand Up @@ -177,7 +177,7 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
admissionAttemptsTotal.WithLabelValues(string(result)).Inc()
AdmissionAttemptsTotal.WithLabelValues(string(result)).Inc()
admissionAttemptDuration.WithLabelValues(string(result)).Observe(duration.Seconds())
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func ClearClusterQueueResourceReservations(cqName, flavor, resource string) {

func Register() {
metrics.Registry.MustRegister(
admissionAttemptsTotal,
AdmissionAttemptsTotal,
admissionAttemptDuration,
PendingWorkloads,
ReservingActiveWorkloads,
Expand Down
1 change: 1 addition & 0 deletions pkg/queue/cluster_queue_impl.go
Expand Up @@ -119,6 +119,7 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
// to potentially become admissible, unless the Eviction status changed
// which can affect the workloads order in the queue.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) &&
equality.Semantic.DeepEqual(oldInfo.Obj.Status.ReclaimablePods, wInfo.Obj.Status.ReclaimablePods) &&
equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted),
apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) {
c.inadmissibleWorkloads[key] = wInfo
Expand Down
12 changes: 12 additions & 0 deletions pkg/queue/cluster_queue_impl_test.go
Expand Up @@ -328,6 +328,18 @@ func TestClusterQueueImpl(t *testing.T) {
inadmissibleWorkloadsToRequeue: []*workload.Info{workload.NewInfo(workloads[1]), workload.NewInfo(workloads[1])},
wantPending: 1,
},
"update reclaimable pods in inadmissible": {
inadmissibleWorkloadsToRequeue: []*workload.Info{
workload.NewInfo(utiltesting.MakeWorkload("w", "").PodSets(*utiltesting.MakePodSet("main", 1).Request(corev1.ResourceCPU, "1").Obj()).Obj()),
},
workloadsToUpdate: []*kueue.Workload{
utiltesting.MakeWorkload("w", "").PodSets(*utiltesting.MakePodSet("main", 2).Request(corev1.ResourceCPU, "1").Obj()).
ReclaimablePods(kueue.ReclaimablePod{Name: "main", Count: 1}).
Obj(),
},
wantActiveWorkloads: []string{"/w"},
wantPending: 1,
},
}

for name, test := range tests {
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager.go
Expand Up @@ -141,7 +141,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
return nil
}

func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue, specUpdated bool) error {
m.Lock()
defer m.Unlock()
cqImpl, ok := m.clusterQueues[cq.Name]
Expand All @@ -162,7 +162,7 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue

// TODO(#8): Selectively move workloads based on the exact event.
// If any workload becomes admissible or the queue becomes active.
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl) || (!oldActive && cqImpl.Active()) {
if (specUpdated && m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) {
m.reportPendingWorkloads(cq.Name, cqImpl)
m.Broadcast()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager_test.go
Expand Up @@ -166,7 +166,7 @@ func TestUpdateClusterQueue(t *testing.T) {

// Put cq2 in the same cohort as cq1.
clusterQueues[1].Spec.Cohort = clusterQueues[0].Spec.Cohort
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1]); err != nil {
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1], true); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestClusterQueueToActive(t *testing.T) {
t.Fatalf("Failed adding clusterQueue %v", err)
}

if err := manager.UpdateClusterQueue(ctx, runningCq); err != nil {
if err := manager.UpdateClusterQueue(ctx, runningCq, false); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down
3 changes: 3 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Expand Up @@ -616,12 +616,14 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})
ginkgo.It("Should re-enqueue by the update event of ClusterQueue", func() {
metrics.AdmissionAttemptsTotal.Reset()
wl := testing.MakeWorkload("on-demand-wl", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "6").Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed())
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl)
util.ExpectPendingWorkloadsMetric(cq, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(cq, 0)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 0)
util.ExpectAdmissionAttemptsMetric(1, 0)

ginkgo.By("updating ClusterQueue")
updatedCq := &kueue.ClusterQueue{}
Expand All @@ -645,6 +647,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectPendingWorkloadsMetric(cq, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(cq, 1)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 1)
util.ExpectAdmissionAttemptsMetric(1, 1)
})
})

Expand Down
15 changes: 15 additions & 0 deletions test/util/util.go
Expand Up @@ -371,6 +371,21 @@ func ExpectWorkloadToBeAdmittedAs(ctx context.Context, k8sClient client.Client,
}, Timeout, Interval).Should(gomega.BeComparableTo(admission))
}

var attemptStatuses = []metrics.AdmissionResult{metrics.AdmissionResultInadmissible, metrics.AdmissionResultSuccess}

func ExpectAdmissionAttemptsMetric(pending, admitted int) {
vals := []int{pending, admitted}

for i, status := range attemptStatuses {
metric := metrics.AdmissionAttemptsTotal.WithLabelValues(string(status))
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetCounterMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return int(v)
}, Timeout, Interval).Should(gomega.Equal(vals[i]), "pending_workloads with status=%s", status)
}
}

var pendingStatuses = []string{metrics.PendingStatusActive, metrics.PendingStatusInadmissible}

func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, active, inadmissible int) {
Expand Down

0 comments on commit d755fb9

Please sign in to comment.