From a2776c5057cd5d15a4f74f896c148f419b9eb8ab Mon Sep 17 00:00:00 2001 From: Antonin Stefanutti Date: Thu, 14 Mar 2024 18:42:30 +0100 Subject: [PATCH] Do not default to suspending RayClusters managed by RayJobs --- .../jobs/raycluster/raycluster_webhook.go | 7 ++ .../raycluster/raycluster_webhook_test.go | 99 ++++++++++++++++++- .../controller/jobs/raycluster/suite_test.go | 36 ++++++- 3 files changed, 138 insertions(+), 4 deletions(-) diff --git a/pkg/controller/jobs/raycluster/raycluster_webhook.go b/pkg/controller/jobs/raycluster/raycluster_webhook.go index 6dd58e752c..93d988509d 100644 --- a/pkg/controller/jobs/raycluster/raycluster_webhook.go +++ b/pkg/controller/jobs/raycluster/raycluster_webhook.go @@ -18,6 +18,7 @@ import ( "fmt" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/klog/v2" @@ -60,6 +61,12 @@ func (w *RayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err log := ctrl.LoggerFrom(ctx).WithName("raycluster-webhook") log.V(10).Info("Applying defaults", "job", klog.KObj(job)) + // Do not default suspend a RayCluster that's managed by a RayJob + if owner := metav1.GetControllerOf(job.Object()); owner != nil && jobframework.IsOwnerManagedByKueue(owner) { + log.V(5).Info("RayCluster is owned by a RayJob that's managed by kueue, skipping") + return nil + } + jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName) return nil } diff --git a/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go b/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go index 6b18bc4921..7c9d259088 100644 --- a/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go +++ b/test/integration/controller/jobs/raycluster/raycluster_webhook_test.go @@ -16,11 +16,21 @@ package raycluster import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" - testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" + "sigs.k8s.io/controller-runtime/pkg/client" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" + "sigs.k8s.io/kueue/pkg/util/testing" + testingraycluster "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster" + testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob" "sigs.k8s.io/kueue/test/integration/framework" "sigs.k8s.io/kueue/test/util" ) @@ -56,10 +66,95 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() { }) ginkgo.It("the creation doesn't succeed if the queue name is invalid", func() { - job := testingjob.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj() + job := testingraycluster.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj() err := k8sClient.Create(ctx, job) gomega.Expect(err).Should(gomega.HaveOccurred()) gomega.Expect(apierrors.IsForbidden(err)).Should(gomega.BeTrue(), "error: %v", err) }) }) + + ginkgo.When("With manageJobsWithoutQueueName enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + DepCRDPaths: []string{rayCrdPath}, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerWithRayClusterAndRayJobControllersSetup(jobframework.WithManageJobsWithoutQueueName(true))) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "raycluster-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.It("Should not suspend a cluster if the parent's workload exist and is admitted", func() { + ginkgo.By("Creating the parent job which has a queue name") + parentJob := testingrayjob.MakeJob("parent-job", ns.Name). + Queue("test"). + Suspend(false). + Obj() + gomega.Expect(k8sClient.Create(ctx, parentJob)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: parentJob.Name, Namespace: ns.Name} + gomega.EventuallyWithOffset(1, func() error { + if err := k8sClient.Get(ctx, lookupKey, parentJob); err != nil { + return err + } + parentJob.Status.JobDeploymentStatus = rayv1alpha1.JobDeploymentStatusSuspended + return k8sClient.Status().Update(ctx, parentJob) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Fetching the workload created for the job") + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(parentJob.Name, parentJob.UID), Namespace: ns.Name} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Admitting the workload created for the job") + admission := testing.MakeAdmission("foo").PodSets( + kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[0].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + }, kueue.PodSetAssignment{ + Name: createdWorkload.Spec.PodSets[1].Name, + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "default", + }, + }, + ).Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed()) + + ginkgo.By("Creating the child cluster") + childCluster := testingraycluster.MakeCluster(jobName, ns.Name). + Suspend(false). + Obj() + gomega.Expect(ctrl.SetControllerReference(parentJob, childCluster, k8sClient.Scheme())).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, childCluster)).Should(gomega.Succeed()) + + childClusterKey := client.ObjectKeyFromObject(childCluster) + ginkgo.By("Checking that the child cluster is not suspended") + gomega.Eventually(func() *bool { + gomega.Expect(k8sClient.Get(ctx, childClusterKey, childCluster)).Should(gomega.Succeed()) + return childCluster.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false))) + }) + }) }) diff --git a/test/integration/controller/jobs/raycluster/suite_test.go b/test/integration/controller/jobs/raycluster/suite_test.go index 3c815571fd..c140ce32ff 100644 --- a/test/integration/controller/jobs/raycluster/suite_test.go +++ b/test/integration/controller/jobs/raycluster/suite_test.go @@ -21,9 +21,9 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" - config "sigs.k8s.io/kueue/apis/config/v1beta1" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" @@ -31,10 +31,12 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster" + "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/webhooks" "sigs.k8s.io/kueue/test/integration/framework" - //+kubebuilder:scaffold:imports + // +kubebuilder:scaffold:imports ) var ( @@ -96,3 +98,33 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu gomega.Expect(err).NotTo(gomega.HaveOccurred()) } } + +func managerWithRayClusterAndRayJobControllersSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + reconciler := raycluster.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = raycluster.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = raycluster.SetupRayClusterWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + reconciler = rayjob.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = rayjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + err = reconciler.SetupWithManager(mgr) + err = rayjob.SetupRayJobWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + } +}