From 768903169f3cb39f93640dde66d348ef27ef9b1e Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Fri, 15 Mar 2024 11:06:00 +0100 Subject: [PATCH] Do not default to suspending a job whose parent is already managed by Kueue (#1846) * Do not default to suspending a job whose parent is already managed by Kueue * Better test assertions --- pkg/controller/jobframework/defaults.go | 9 ++ .../jobs/raycluster/raycluster_webhook.go | 1 - .../raycluster/raycluster_webhook_test.go | 97 ++++++++++++++++++- .../controller/jobs/raycluster/suite_test.go | 34 ++++++- 4 files changed, 137 insertions(+), 4 deletions(-) diff --git a/pkg/controller/jobframework/defaults.go b/pkg/controller/jobframework/defaults.go index 8d6951c318..21421f53e1 100644 --- a/pkg/controller/jobframework/defaults.go +++ b/pkg/controller/jobframework/defaults.go @@ -13,7 +13,16 @@ limitations under the License. package jobframework +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) { + // Do not default suspend a job whose owner is already managed by Kueue + if owner := metav1.GetControllerOf(job.Object()); owner != nil && IsOwnerManagedByKueue(owner) { + return + } + if QueueName(job) != "" || manageJobsWithoutQueueName { if !job.IsSuspended() { job.Suspend() diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook.go b/pkg/controller/jobs/raycluster/raycluster_webhook.go index 6dd58e752c..4e96f7cb0d 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook.go @@ -59,7 +59,6 @@ func (w *RayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err job := fromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("raycluster-webhook") log.V(10).Info("Applying defaults", "job", klog.KObj(job)) - jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) return nil } diff --git a/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go b/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go index 6b18bc4921..29cc8eaf3f 100644 --- a/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go +++ b/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go @@ -16,11 +16,21 @@ package raycluster import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" - testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" + "sigs.k8s.io/kueue/pkg/util/testing" + testingraycluster "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" + testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" "sigs.k8s.io/kueue/test/integration/framework" "sigs.k8s.io/kueue/test/util" ) @@ -56,10 +66,93 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() { }) ginkgo.It("the creation doesn't succeed if the queue name is invalid", func() { - job := testingjob.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj() + job := testingraycluster.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj() err := k8sClient.Create(ctx, job) gomega.Expect(err).Should(gomega.HaveOccurred()) gomega.Expect(apierrors.IsForbidden(err)).Should(gomega.BeTrue(), "error: %v", err) }) }) + + ginkgo.When("With manageJobsWithoutQueueName enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{rayCrdPath}, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerWithRayClusterAndRayJobControllersSetup(jobframework.WithManageJobsWithoutQueueName(true))) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "raycluster-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.It("Should not suspend a cluster if the parent's workload exist and is admitted", func() { + ginkgo.By("Creating the parent job which has a queue name") + parentJob := testingrayjob.MakeJob("parent-job", ns.Name). + Queue("test"). + Suspend(false). + Obj() + gomega.Expect(k8sClient.Create(ctx, parentJob)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: parentJob.Name, Namespace: ns.Name} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, lookupKey, parentJob)).To(gomega.Succeed()) + parentJob.Status.JobDeploymentStatus = rayv1alpha1.JobDeploymentStatusSuspended + g.Expect(k8sClient.Status().Update(ctx, parentJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Fetching the workload created for the job") + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(parentJob.Name, parentJob.UID), Namespace: ns.Name} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Admitting the workload created for the job") + admission := testing.MakeAdmission("foo").PodSets( + kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[0].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + }, kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[1].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + }, + ).Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).To(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + + ginkgo.By("Creating the child cluster") + childCluster := testingraycluster.MakeCluster(jobName, ns.Name). + Suspend(false). + Obj() + gomega.Expect(ctrl.SetControllerReference(parentJob, childCluster, k8sClient.Scheme())).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, childCluster)).To(gomega.Succeed()) + + ginkgo.By("Checking that the child cluster is not suspended") + childClusterKey := client.ObjectKeyFromObject(childCluster) + gomega.Eventually(func(g gomega.Gomega) { + gomega.Expect(k8sClient.Get(ctx, childClusterKey, childCluster)).To(gomega.Succeed()) + gomega.Expect(childCluster.Spec.Suspend).To(gomega.Equal(ptr.To(false))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) diff --git a/test/integration/controller/jobs/raycluster/suite_test.go b/test/integration/controller/jobs/raycluster/suite_test.go index 3c815571fd..d862e317dc 100644 --- a/test/integration/controller/jobs/raycluster/suite_test.go +++ b/test/integration/controller/jobs/raycluster/suite_test.go @@ -31,10 +31,11 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster" + "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/webhooks" "sigs.k8s.io/kueue/test/integration/framework" - //+kubebuilder:scaffold:imports ) var ( @@ -96,3 +97,34 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu gomega.Expect(err).NotTo(gomega.HaveOccurred()) } } + +func managerWithRayClusterAndRayJobControllersSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + reconciler := raycluster.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = raycluster.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = raycluster.SetupRayClusterWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + reconciler = rayjob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = rayjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = rayjob.SetupRayJobWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + } +}