Skip to content

Commit

Permalink
Avoid requeue on ClusterQueue status update
Browse files Browse the repository at this point in the history
Change-Id: Id028414a730a238549b7fddf5d42a2eafdf959c3
  • Loading branch information
alculquicondor committed Mar 11, 2024
1 parent e5a769b commit de415e6
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 8 deletions.
3 changes: 2 additions & 1 deletion pkg/controller/core/clusterqueue_controller.go
Expand Up @@ -302,11 +302,12 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
return true
}
defer r.notifyWatchers(oldCq, newCq)
specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec)

if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq, specUpdated); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -544,6 +545,16 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Delete the workload from cache while holding the queues lock
// to guarantee that requeued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})

default:
// Workload update in the cache is handled here; however, some fields are immutable
Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/metrics.go
Expand Up @@ -55,7 +55,7 @@ const (
var (
CQStatuses = []ClusterQueueStatus{CQStatusPending, CQStatusActive, CQStatusTerminating}

admissionAttemptsTotal = prometheus.NewCounterVec(
AdmissionAttemptsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "admission_attempts_total",
Expand Down Expand Up @@ -177,7 +177,7 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
admissionAttemptsTotal.WithLabelValues(string(result)).Inc()
AdmissionAttemptsTotal.WithLabelValues(string(result)).Inc()
admissionAttemptDuration.WithLabelValues(string(result)).Observe(duration.Seconds())
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func ClearClusterQueueResourceReservations(cqName, flavor, resource string) {

func Register() {
metrics.Registry.MustRegister(
admissionAttemptsTotal,
AdmissionAttemptsTotal,
admissionAttemptDuration,
PendingWorkloads,
ReservingActiveWorkloads,
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager.go
Expand Up @@ -141,7 +141,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
return nil
}

func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue, specUpdated bool) error {
m.Lock()
defer m.Unlock()
cqImpl, ok := m.clusterQueues[cq.Name]
Expand All @@ -162,7 +162,7 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue

// TODO(#8): Selectively move workloads based on the exact event.
// If any workload becomes admissible or the queue becomes active.
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl) || (!oldActive && cqImpl.Active()) {
if (specUpdated && m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) {
m.reportPendingWorkloads(cq.Name, cqImpl)
m.Broadcast()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager_test.go
Expand Up @@ -166,7 +166,7 @@ func TestUpdateClusterQueue(t *testing.T) {

// Put cq2 in the same cohort as cq1.
clusterQueues[1].Spec.Cohort = clusterQueues[0].Spec.Cohort
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1]); err != nil {
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1], true); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestClusterQueueToActive(t *testing.T) {
t.Fatalf("Failed adding clusterQueue %v", err)
}

if err := manager.UpdateClusterQueue(ctx, runningCq); err != nil {
if err := manager.UpdateClusterQueue(ctx, runningCq, false); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down
2 changes: 2 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Expand Up @@ -622,6 +622,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectPendingWorkloadsMetric(cq, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(cq, 0)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 0)
util.ExpectAdmissionAttemptsMetric(1, 0)

ginkgo.By("updating ClusterQueue")
updatedCq := &kueue.ClusterQueue{}
Expand All @@ -645,6 +646,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectPendingWorkloadsMetric(cq, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(cq, 1)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 1)
util.ExpectAdmissionAttemptsMetric(1, 1)
})
})

Expand Down
15 changes: 15 additions & 0 deletions test/util/util.go
Expand Up @@ -371,6 +371,21 @@ func ExpectWorkloadToBeAdmittedAs(ctx context.Context, k8sClient client.Client,
}, Timeout, Interval).Should(gomega.BeComparableTo(admission))
}

var attemptStatuses = []metrics.AdmissionResult{metrics.AdmissionResultInadmissible, metrics.AdmissionResultSuccess}

func ExpectAdmissionAttemptsMetric(pending, admitted int) {
vals := []int{pending, admitted}

for i, status := range attemptStatuses {
metric := metrics.AdmissionAttemptsTotal.WithLabelValues(string(status))
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetCounterMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return int(v)
}, Timeout, Interval).Should(gomega.Equal(vals[i]), "pending_workloads with status=%s", status)
}
}

var pendingStatuses = []string{metrics.PendingStatusActive, metrics.PendingStatusInadmissible}

func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, active, inadmissible int) {
Expand Down

0 comments on commit de415e6

Please sign in to comment.