Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid requeue on ClusterQueue status update #1822

Merged
merged 3 commits into from Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/controller/core/clusterqueue_controller.go
Expand Up @@ -302,11 +302,12 @@ func (r *ClusterQueueReconciler) Update(e event.UpdateEvent) bool {
return true
}
defer r.notifyWatchers(oldCq, newCq)
specUpdated := !equality.Semantic.DeepEqual(oldCq.Spec, newCq.Spec)

if err := r.cache.UpdateClusterQueue(newCq); err != nil {
log.Error(err, "Failed to update clusterQueue in cache")
}
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq); err != nil {
if err := r.qManager.UpdateClusterQueue(context.Background(), newCq, specUpdated); err != nil {
log.Error(err, "Failed to update clusterQueue in queue manager")
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -544,6 +545,16 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, wl, func() {
// Update the workload from cache while holding the queues lock
// to guarantee that requeued workloads are taken into account before
// the next scheduling cycle.
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Failed to delete workload from cache")
}
})

default:
// Workload update in the cache is handled here; however, some fields are immutable
Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/metrics.go
Expand Up @@ -55,7 +55,7 @@ const (
var (
CQStatuses = []ClusterQueueStatus{CQStatusPending, CQStatusActive, CQStatusTerminating}

admissionAttemptsTotal = prometheus.NewCounterVec(
AdmissionAttemptsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: constants.KueueName,
Name: "admission_attempts_total",
Expand Down Expand Up @@ -177,7 +177,7 @@ For a ClusterQueue, the metric only reports a value of 1 for one of the statuses
)

func AdmissionAttempt(result AdmissionResult, duration time.Duration) {
admissionAttemptsTotal.WithLabelValues(string(result)).Inc()
AdmissionAttemptsTotal.WithLabelValues(string(result)).Inc()
admissionAttemptDuration.WithLabelValues(string(result)).Observe(duration.Seconds())
}

Expand Down Expand Up @@ -290,7 +290,7 @@ func ClearClusterQueueResourceReservations(cqName, flavor, resource string) {

func Register() {
metrics.Registry.MustRegister(
admissionAttemptsTotal,
AdmissionAttemptsTotal,
admissionAttemptDuration,
PendingWorkloads,
ReservingActiveWorkloads,
Expand Down
1 change: 1 addition & 0 deletions pkg/queue/cluster_queue_impl.go
Expand Up @@ -119,6 +119,7 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
// to potentially become admissible, unless the Eviction status changed
// which can affect the workloads order in the queue.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) &&
equality.Semantic.DeepEqual(oldInfo.Obj.Status.ReclaimablePods, wInfo.Obj.Status.ReclaimablePods) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend this UT:

func TestClusterQueueImpl(t *testing.T) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted),
apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) {
c.inadmissibleWorkloads[key] = wInfo
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager.go
Expand Up @@ -141,7 +141,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e
return nil
}

func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) error {
func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue, specUpdated bool) error {
m.Lock()
defer m.Unlock()
cqImpl, ok := m.clusterQueues[cq.Name]
Expand All @@ -162,7 +162,7 @@ func (m *Manager) UpdateClusterQueue(ctx context.Context, cq *kueue.ClusterQueue

// TODO(#8): Selectively move workloads based on the exact event.
// If any workload becomes admissible or the queue becomes active.
if m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl) || (!oldActive && cqImpl.Active()) {
if (specUpdated && m.queueAllInadmissibleWorkloadsInCohort(ctx, cqImpl)) || (!oldActive && cqImpl.Active()) {
m.reportPendingWorkloads(cq.Name, cqImpl)
m.Broadcast()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/queue/manager_test.go
Expand Up @@ -166,7 +166,7 @@ func TestUpdateClusterQueue(t *testing.T) {

// Put cq2 in the same cohort as cq1.
clusterQueues[1].Spec.Cohort = clusterQueues[0].Spec.Cohort
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1]); err != nil {
if err := manager.UpdateClusterQueue(ctx, clusterQueues[1], true); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestClusterQueueToActive(t *testing.T) {
t.Fatalf("Failed adding clusterQueue %v", err)
}

if err := manager.UpdateClusterQueue(ctx, runningCq); err != nil {
if err := manager.UpdateClusterQueue(ctx, runningCq, false); err != nil {
t.Fatalf("Failed to update ClusterQueue: %v", err)
}

Expand Down
3 changes: 3 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Expand Up @@ -616,12 +616,14 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})
ginkgo.It("Should re-enqueue by the update event of ClusterQueue", func() {
metrics.AdmissionAttemptsTotal.Reset()
wl := testing.MakeWorkload("on-demand-wl", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "6").Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).Should(gomega.Succeed())
util.ExpectWorkloadsToBePending(ctx, k8sClient, wl)
util.ExpectPendingWorkloadsMetric(cq, 0, 1)
util.ExpectReservingActiveWorkloadsMetric(cq, 0)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 0)
util.ExpectAdmissionAttemptsMetric(1, 0)

ginkgo.By("updating ClusterQueue")
updatedCq := &kueue.ClusterQueue{}
Expand All @@ -645,6 +647,7 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectPendingWorkloadsMetric(cq, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(cq, 1)
util.ExpectAdmittedWorkloadsTotalMetric(cq, 1)
util.ExpectAdmissionAttemptsMetric(1, 1)
})
})

Expand Down
15 changes: 15 additions & 0 deletions test/util/util.go
Expand Up @@ -371,6 +371,21 @@ func ExpectWorkloadToBeAdmittedAs(ctx context.Context, k8sClient client.Client,
}, Timeout, Interval).Should(gomega.BeComparableTo(admission))
}

var attemptStatuses = []metrics.AdmissionResult{metrics.AdmissionResultInadmissible, metrics.AdmissionResultSuccess}

func ExpectAdmissionAttemptsMetric(pending, admitted int) {
vals := []int{pending, admitted}

for i, status := range attemptStatuses {
metric := metrics.AdmissionAttemptsTotal.WithLabelValues(string(status))
gomega.EventuallyWithOffset(1, func() int {
v, err := testutil.GetCounterMetricValue(metric)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
return int(v)
}, Timeout, Interval).Should(gomega.Equal(vals[i]), "pending_workloads with status=%s", status)
}
}

var pendingStatuses = []string{metrics.PendingStatusActive, metrics.PendingStatusInadmissible}

func ExpectPendingWorkloadsMetric(cq *kueue.ClusterQueue, active, inadmissible int) {
Expand Down