Skip to content

Commit

Permalink
Do not default to suspending a job whose parent is already managed by…
Browse files Browse the repository at this point in the history
… Kueue
  • Loading branch information
astefanutti committed Mar 14, 2024
1 parent a96927d commit 1e7a6c0
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 4 deletions.
9 changes: 9 additions & 0 deletions pkg/controller/jobframework/defaults.go
Expand Up @@ -13,7 +13,16 @@ limitations under the License.

package jobframework

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func ApplyDefaultForSuspend(job GenericJob, manageJobsWithoutQueueName bool) {
// Do not default suspend a job whose owner is already managed by Kueue
if owner := metav1.GetControllerOf(job.Object()); owner != nil && IsOwnerManagedByKueue(owner) {
return
}

if QueueName(job) != "" || manageJobsWithoutQueueName {
if !job.IsSuspended() {
job.Suspend()
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/jobs/raycluster/raycluster_webhook.go
Expand Up @@ -59,7 +59,6 @@ func (w *RayClusterWebhook) Default(ctx context.Context, obj runtime.Object) err
job := fromObject(obj)
log := ctrl.LoggerFrom(ctx).WithName("raycluster-webhook")
log.V(10).Info("Applying defaults", "job", klog.KObj(job))

jobframework.ApplyDefaultForSuspend(job, w.manageJobsWithoutQueueName)
return nil
}
Expand Down
Expand Up @@ -16,11 +16,21 @@ package raycluster
import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
rayv1alpha1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"

testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster"
"sigs.k8s.io/controller-runtime/pkg/client"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
workloadrayjob "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/util/testing"
testingraycluster "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
"sigs.k8s.io/kueue/test/integration/framework"
"sigs.k8s.io/kueue/test/util"
)
Expand Down Expand Up @@ -56,10 +66,95 @@ var _ = ginkgo.Describe("RayCluster Webhook", func() {
})

ginkgo.It("the creation doesn't succeed if the queue name is invalid", func() {
job := testingjob.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj()
job := testingraycluster.MakeCluster(jobName, ns.Name).Queue("indexed_job").Obj()
err := k8sClient.Create(ctx, job)
gomega.Expect(err).Should(gomega.HaveOccurred())
gomega.Expect(apierrors.IsForbidden(err)).Should(gomega.BeTrue(), "error: %v", err)
})
})

ginkgo.When("With manageJobsWithoutQueueName enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {

ginkgo.BeforeAll(func() {
fwk = &framework.Framework{
CRDPath: crdPath,
DepCRDPaths: []string{rayCrdPath},
WebhookPath: webhookPath,
}
cfg = fwk.Init()
ctx, k8sClient = fwk.RunManager(cfg, managerWithRayClusterAndRayJobControllersSetup(jobframework.WithManageJobsWithoutQueueName(true)))
})
ginkgo.BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "raycluster-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
})
ginkgo.AfterAll(func() {
fwk.Teardown()
})

ginkgo.It("Should not suspend a cluster if the parent's workload exist and is admitted", func() {
ginkgo.By("Creating the parent job which has a queue name")
parentJob := testingrayjob.MakeJob("parent-job", ns.Name).
Queue("test").
Suspend(false).
Obj()
gomega.Expect(k8sClient.Create(ctx, parentJob)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: parentJob.Name, Namespace: ns.Name}
gomega.EventuallyWithOffset(1, func() error {
if err := k8sClient.Get(ctx, lookupKey, parentJob); err != nil {
return err
}
parentJob.Status.JobDeploymentStatus = rayv1alpha1.JobDeploymentStatusSuspended
return k8sClient.Status().Update(ctx, parentJob)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Fetching the workload created for the job")
createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadrayjob.GetWorkloadNameForRayJob(parentJob.Name, parentJob.UID), Namespace: ns.Name}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("Admitting the workload created for the job")
admission := testing.MakeAdmission("foo").PodSets(
kueue.PodSetAssignment{
Name: createdWorkload.Spec.PodSets[0].Name,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
}, kueue.PodSetAssignment{
Name: createdWorkload.Spec.PodSets[1].Name,
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "default",
},
},
).Obj()
gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed())
util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload)
gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).Should(gomega.Succeed())

ginkgo.By("Creating the child cluster")
childCluster := testingraycluster.MakeCluster(jobName, ns.Name).
Suspend(false).
Obj()
gomega.Expect(ctrl.SetControllerReference(parentJob, childCluster, k8sClient.Scheme())).To(gomega.Succeed())
gomega.Expect(k8sClient.Create(ctx, childCluster)).Should(gomega.Succeed())

childClusterKey := client.ObjectKeyFromObject(childCluster)
ginkgo.By("Checking that the child cluster is not suspended")
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, childClusterKey, childCluster)).Should(gomega.Succeed())
return childCluster.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(false)))
})
})
})
35 changes: 34 additions & 1 deletion test/integration/controller/jobs/raycluster/suite_test.go
Expand Up @@ -31,10 +31,12 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/raycluster"
"sigs.k8s.io/kueue/pkg/controller/jobs/rayjob"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/scheduler"
"sigs.k8s.io/kueue/pkg/webhooks"
"sigs.k8s.io/kueue/test/integration/framework"
//+kubebuilder:scaffold:imports
// +kubebuilder:scaffold:imports
)

var (
Expand Down Expand Up @@ -96,3 +98,34 @@ func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetu
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}
}

func managerWithRayClusterAndRayJobControllersSetup(opts ...jobframework.Option) framework.ManagerSetup {
return func(mgr manager.Manager, ctx context.Context) {
reconciler := raycluster.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName),
opts...)
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = raycluster.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = reconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = raycluster.SetupRayClusterWebhook(mgr, opts...)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

reconciler = rayjob.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName),
opts...)
err = rayjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = reconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = rayjob.SetupRayJobWebhook(mgr, opts...)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

failedWebhook, err := webhooks.Setup(mgr)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)
}
}

0 comments on commit 1e7a6c0

Please sign in to comment.