Skip to content

Commit

Permalink
WaitForPodsReady: Reset the requeueState while reconciling instead of…
Browse files Browse the repository at this point in the history
… webhook (#1843)

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Mar 14, 2024
1 parent d7df9e5 commit 3adac90
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 57 deletions.
2 changes: 0 additions & 2 deletions charts/kueue/templates/webhook/webhook.yaml
Expand Up @@ -287,10 +287,8 @@ webhooks:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- workloads
- workloads/status
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
Expand Down
2 changes: 0 additions & 2 deletions config/components/webhook/manifests.yaml
Expand Up @@ -267,10 +267,8 @@ webhooks:
- v1beta1
operations:
- CREATE
- UPDATE
resources:
- workloads
- workloads/status
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -149,6 +149,12 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

// If a deactivated workload is re-activated, we need to reset the RequeueState.
if wl.Status.RequeueState != nil && ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(&wl) {
wl.Status.RequeueState = nil
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
}

if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() {
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/webhooks/workload_webhook.go
Expand Up @@ -50,7 +50,7 @@ func setupWebhookForWorkload(mgr ctrl.Manager) error {
Complete()
}

// +kubebuilder:webhook:path=/mutate-kueue-x-k8s-io-v1beta1-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads;workloads/status,verbs=create;update,versions=v1beta1,name=mworkload.kb.io,admissionReviewVersions=v1
// +kubebuilder:webhook:path=/mutate-kueue-x-k8s-io-v1beta1-workload,mutating=true,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads,verbs=create,versions=v1beta1,name=mworkload.kb.io,admissionReviewVersions=v1

var _ webhook.CustomDefaulter = &WorkloadWebhook{}

Expand All @@ -76,10 +76,6 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error
}
}

// If a deactivated workload is re-activated, we need to reset the RequeueState.
if ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(wl) && workload.HasRequeueState(wl) {
wl.Status.RequeueState = nil
}
return nil
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/webhooks/workload_webhook_test.go
Expand Up @@ -78,22 +78,6 @@ func TestWorkloadWebhookDefault(t *testing.T) {
},
},
},
"re-activated workload with re-queue state is reset the re-queue state": {
wl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
}).RequeueState(ptr.To[int32](5), ptr.To(metav1.Now())).
Obj(),
wantWl: *testingutil.MakeWorkload(testWorkloadName, testWorkloadNamespace).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
}).
Obj(),
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
25 changes: 24 additions & 1 deletion test/integration/scheduler/podsready/scheduler_test.go
Expand Up @@ -248,7 +248,7 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() {
// To avoid flakiness, we don't verify if the workload has a QuotaReserved=false with pending reason here.
})

ginkgo.It("Should re-admit a timed out workload and deactivate a workload exceeded the re-queue count limit", func() {
ginkgo.It("Should re-admit a timed out workload and deactivate a workload exceeded the re-queue count limit. After that re-activating a workload", func() {
ginkgo.By("create the 'prod' workload")
prodWl := testing.MakeWorkload("prod", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Obj()
gomega.Expect(k8sClient.Create(ctx, prodWl)).Should(gomega.Succeed())
Expand Down Expand Up @@ -276,6 +276,29 @@ var _ = ginkgo.Describe("SchedulerWithWaitForPodsReady", func() {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl))
g.Expect(ptr.Deref(prodWl.Spec.Active, true)).Should(gomega.BeFalse())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("verify the re-activated inactive 'prod' workload re-queue state is reset")
// TODO: Once we move a logic to issue the Eviction with InactiveWorkload reason, we need to remove the below updates.
// REF: https://github.com/kubernetes-sigs/kueue/issues/1841
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
apimeta.SetStatusCondition(&prodWl.Status.Conditions, metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadEvictedByDeactivation,
Message: "evicted by Test",
})
g.Expect(k8sClient.Status().Update(ctx, prodWl)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), "Job reconciler should add an Evicted condition with InactiveWorkload to the Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
prodWl.Spec.Active = ptr.To(true)
g.Expect(k8sClient.Update(ctx, prodWl)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), "Reactivate inactive Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodWl), prodWl)).Should(gomega.Succeed())
g.Expect(prodWl.Status.RequeueState).Should(gomega.BeNil())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.It("Should unblock admission of new workloads in other ClusterQueues once the admitted workload exceeds timeout", func() {
Expand Down
31 changes: 0 additions & 31 deletions test/integration/webhook/workload_test.go
Expand Up @@ -82,37 +82,6 @@ var _ = ginkgo.Describe("Workload defaulting webhook", func() {

gomega.Expect(created.Spec.PodSets[0].Name).Should(gomega.Equal(kueue.DefaultPodSetName))
})
ginkgo.It("Should reset re-queue state", func() {
ginkgo.By("Creating a new inactive Workload")
workload := testing.MakeWorkload(workloadName, ns.Name).
Active(false).
Obj()
gomega.Expect(k8sClient.Create(ctx, workload)).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload))
workload.Status = kueue.WorkloadStatus{
Conditions: []metav1.Condition{{
Type: kueue.WorkloadEvicted,
Reason: kueue.WorkloadEvictedByDeactivation,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}},
RequeueState: &kueue.RequeueState{
Count: ptr.To[int32](10),
RequeueAt: ptr.To(metav1.Now()),
},
}
g.Expect(k8sClient.Status().Update(ctx, workload)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
ginkgo.By("Activate a Workload")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload))
workload.Spec.Active = ptr.To(true)
g.Expect(k8sClient.Update(ctx, workload)).Should(gomega.Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(workload), workload))
g.Expect(workload.Status.RequeueState).Should(gomega.BeNil(), "re-queue state should be reset")
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})

Expand Down

0 comments on commit 3adac90

Please sign in to comment.