diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 865bc0c446..fd9d375ff6 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -273,11 +273,8 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques } if wl != nil && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { - // Finalize the job if it's finished - if _, finished := job.Finished(); finished { - if err := r.finalizeJob(ctx, job); err != nil { - return ctrl.Result{}, err - } + if err := r.finalizeJob(ctx, job); err != nil { + return ctrl.Result{}, err } r.record.Eventf(object, corev1.EventTypeNormal, ReasonFinishedWorkload, diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index 640d7e273c..0d6af24b98 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -978,10 +978,11 @@ func TestReconciler(t *testing.T) { }, workloadCmpOpts: defaultWorkloadCmpOpts, }, - "workload is not deleted if one of the pods in the finished group is absent": { + "Pods are finalized even if one of the pods in the finished group is absent": { pods: []corev1.Pod{ *basePodWrapper. Clone(). + KueueFinalizer(). Label("kueue.x-k8s.io/managed", "true"). Group("test-group"). GroupTotalCount("2"). diff --git a/site/static/examples/pods-kueue/kueue-pod-group.yaml b/site/static/examples/pods-kueue/kueue-pod-group.yaml index b3482108b6..29734fd1b4 100644 --- a/site/static/examples/pods-kueue/kueue-pod-group.yaml +++ b/site/static/examples/pods-kueue/kueue-pod-group.yaml @@ -16,7 +16,7 @@ spec: command: ["sh", "-c", 'echo "hello world from the leader pod" && sleep 3'] resources: requests: - cpu: 3 + cpu: 0.5 --- apiVersion: v1 kind: Pod @@ -35,4 +35,4 @@ spec: command: ["sh", "-c", 'echo "hello world from the worker pod" && sleep 2'] resources: requests: - cpu: 3 + cpu: 0.5 diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go index e9918e799d..5a3d1987d2 100644 --- a/test/integration/controller/jobs/pod/pod_controller_test.go +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -18,6 +18,7 @@ package pod import ( "fmt" + "strconv" "time" "github.com/google/go-cmp/cmp" @@ -1025,6 +1026,66 @@ var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailu }) }) + ginkgo.It("Should finalize all Succeeded Pods when deleted", func() { + ginkgo.By("Creating pods with queue name") + // Use a number of Pods big enough to cause conflicts when removing finalizers >50% of the time. + // 7 + const podCount = 10 + pods := make([]*corev1.Pod, podCount) + for i := range pods { + pods[i] = testingpod.MakePod(fmt.Sprintf("test-pod-%d", i), ns.Name). + Group("test-group"). + GroupTotalCount(strconv.Itoa(podCount)). + Request(corev1.ResourceCPU, "1"). + Queue("test-queue"). + Obj() + gomega.Expect(k8sClient.Create(ctx, pods[i])).Should(gomega.Succeed()) + } + + ginkgo.By("checking that workload is created for the pod group") + wlLookupKey := types.NamespacedName{ + Namespace: pods[0].Namespace, + Name: "test-group", + } + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Admitting workload", func() { + admission := testing.MakeAdmission(clusterQueue.Name).PodSets( + kueue.PodSetAssignment{ + Name: "4b0469f7", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + Count: ptr.To[int32](podCount), + }, + ).Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + for i := range pods { + util.ExpectPodUnsuspendedWithNodeSelectors(ctx, k8sClient, client.ObjectKeyFromObject(pods[i]), map[string]string{"kubernetes.io/arch": "arm64"}) + } + }) + + ginkgo.By("Finishing and deleting Pods", func() { + util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pods...) + for i := range pods { + gomega.Expect(k8sClient.Delete(ctx, pods[i])).To(gomega.Succeed()) + } + + gomega.Eventually(func(g gomega.Gomega) { + for i := range pods { + key := types.NamespacedName{Namespace: ns.Name, Name: fmt.Sprintf("test-pod-%d", i)} + g.Expect(k8sClient.Get(ctx, key, &corev1.Pod{})).To(testing.BeNotFoundError()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + }) + ginkgo.It("Should finalize workload if pods are absent", func() { ginkgo.By("Creating pods with queue name") pod1 := testingpod.MakePod("test-pod1", ns.Name).