diff --git a/pkg/controller/core/clusterqueue_controller.go b/pkg/controller/core/clusterqueue_controller.go index 8d15d75b0a..757eb2185b 100644 --- a/pkg/controller/core/clusterqueue_controller.go +++ b/pkg/controller/core/clusterqueue_controller.go @@ -302,11 +302,12 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool { return true } defer r.notifyWatchers(oldCq, newCq) + specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec) if err := r.cache.UpdateClusterQueue(newCq); err != nil { log.Error(err, "Failed to update clusterQueue in cache") } - if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil { + if err := r.qManager.UpdateClusterQueue(context.Background(), newCq, specUpdated); err != nil { log.Error(err, "Failed to update clusterQueue in queue manager") } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index ed6c17843f..0f1cade5ec 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" + "k8s.io/apimachinery/pkg/api/equality" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -544,6 +545,16 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { if !r.queues.AddOrUpdateWorkload(wlCopy) { log.V(2).Info("Queue for workload didn't exist; ignored for now") } + case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods): + // trigger the move of associated inadmissibleWorkloads, if there are any. + r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() { + // Update the workload from cache while holding the queues lock + // to guarantee that requeued workloads are taken into account before + // the next scheduling cycle. + if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil { + log.Error(err, "Failed to delete workload from cache") + } + }) default: // Workload update in the cache is handled here; however, some fields are immutable diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 665e215c5d..74ded34f31 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -55,7 +55,7 @@ const ( var ( CQStatuses = []ClusterQueueStatus{CQStatusPending, CQStatusActive, CQStatusTerminating} - admissionAttemptsTotal = prometheus.NewCounterVec( + AdmissionAttemptsTotal = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: constants.KueueName, Name: "admission_attempts_total", @@ -177,7 +177,7 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses ) func AdmissionAttempt(result AdmissionResult, duration time.Duration) { - admissionAttemptsTotal.WithLabelValues(string(result)).Inc() + AdmissionAttemptsTotal.WithLabelValues(string(result)).Inc() admissionAttemptDuration.WithLabelValues(string(result)).Observe(duration.Seconds()) } @@ -290,7 +290,7 @@ func ClearClusterQueueResourceReservations(cqName, flavor, resource string) { func Register() { metrics.Registry.MustRegister( - admissionAttemptsTotal, + AdmissionAttemptsTotal, admissionAttemptDuration, PendingWorkloads, ReservingActiveWorkloads, diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index f7f0b33db7..14f6bb2581 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -141,7 +141,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e return nil } -func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error { +func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue, specUpdated bool) error { m.Lock() defer m.Unlock() cqImpl, ok := m.clusterQueues[cq.Name] @@ -162,7 +162,7 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue // TODO(#8): Selectively move workloads based on the exact event. // If any workload becomes admissible or the queue becomes active. - if m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl) || (!oldActive && cqImpl.Active()) { + if (specUpdated && m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) { m.reportPendingWorkloads(cq.Name, cqImpl) m.Broadcast() } diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go index c42509b836..f170f2072c 100644 --- a/pkg/queue/manager_test.go +++ b/pkg/queue/manager_test.go @@ -166,7 +166,7 @@ func TestUpdateClusterQueue(t *testing.T) { // Put cq2 in the same cohort as cq1. clusterQueues[1].Spec.Cohort = clusterQueues[0].Spec.Cohort - if err := manager.UpdateClusterQueue(ctx, clusterQueues[1]); err != nil { + if err := manager.UpdateClusterQueue(ctx, clusterQueues[1], true); err != nil { t.Fatalf("Failed to update ClusterQueue: %v", err) } @@ -225,7 +225,7 @@ func TestClusterQueueToActive(t *testing.T) { t.Fatalf("Failed adding clusterQueue %v", err) } - if err := manager.UpdateClusterQueue(ctx, runningCq); err != nil { + if err := manager.UpdateClusterQueue(ctx, runningCq, false); err != nil { t.Fatalf("Failed to update ClusterQueue: %v", err) } diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index ecd564322e..65b1b2eeb4 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -622,6 +622,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectPendingWorkloadsMetric(cq, 0, 1) util.ExpectReservingActiveWorkloadsMetric(cq, 0) util.ExpectAdmittedWorkloadsTotalMetric(cq, 0) + util.ExpectAdmissionAttemptsMetric(1, 0) ginkgo.By("updating ClusterQueue") updatedCq := &kueue.ClusterQueue{} @@ -645,6 +646,7 @@ var _ = ginkgo.Describe("Scheduler", func() { util.ExpectPendingWorkloadsMetric(cq, 0, 0) util.ExpectReservingActiveWorkloadsMetric(cq, 1) util.ExpectAdmittedWorkloadsTotalMetric(cq, 1) + util.ExpectAdmissionAttemptsMetric(1, 1) }) }) diff --git a/test/util/util.go b/test/util/util.go index 0dd35550bc..10efeb7e1a 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -371,6 +371,21 @@ func ExpectWorkloadToBeAdmittedAs(ctx context.Context, k8sClient client.Client, }, Timeout, Interval).Should(gomega.BeComparableTo(admission)) } +var attemptStatuses = []metrics.AdmissionResult{metrics.AdmissionResultInadmissible, metrics.AdmissionResultSuccess} + +func ExpectAdmissionAttemptsMetric(pending, admitted int) { + vals := []int{pending, admitted} + + for i, status := range attemptStatuses { + metric := metrics.AdmissionAttemptsTotal.WithLabelValues(string(status)) + gomega.EventuallyWithOffset(1, func() int { + v, err := testutil.GetCounterMetricValue(metric) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + return int(v) + }, Timeout, Interval).Should(gomega.Equal(vals[i]), "pending_workloads with status=%s", status) + } +} + var pendingStatuses = []string{metrics.PendingStatusActive, metrics.PendingStatusInadmissible} func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, active, inadmissible int) {