diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index 1d14f0fc99..45e6ec117d 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -27,7 +27,6 @@ export WORKER2_KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME}-worker2 export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION} -export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/ source ${SOURCE_DIR}/e2e-common.sh @@ -90,14 +89,8 @@ function kind_load { # JOBSET SETUP - # MANAGER - # Only install the CRDs and not the controller to be able to - # have JobSets admitted without execution in the manager cluster. - kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME} - kubectl apply --server-side -f ${JOBSET_CRDS}/* - - #WORKERS docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION + install_jobset $MANAGER_KIND_CLUSTER_NAME install_jobset $WORKER1_KIND_CLUSTER_NAME install_jobset $WORKER2_KIND_CLUSTER_NAME fi diff --git a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go index 355ba334a0..3323084877 100644 --- a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c func (b *batchJobAdapter) KeepAdmissionCheckPending() bool { return true } + +func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) { + return true, "", nil +} diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go index 26febb05cb..8ec48835a2 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go @@ -27,7 +27,8 @@ import ( jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" ) type jobsetAdapter struct{} @@ -62,9 +63,12 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client, if remoteJob.Labels == nil { remoteJob.Labels = map[string]string{} } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + remoteJob.Labels[controllerconstants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin + // set the manager + remoteJob.Labels[jobset.LabelManagedBy] = jobset.JobSetManager + return remoteClient.Create(ctx, &remoteJob) } @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool { return false } +func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { + js := jobset.JobSet{} + err := c.Get(ctx, key, &js) + if err != nil { + return false, "", err + } + lblVal := js.Labels[jobset.LabelManagedBy] + if lblVal != constants.KueueName { + return false, fmt.Sprintf("Expecting label %q to be %q not %q", jobset.LabelManagedBy, constants.KueueName, lblVal), nil + } + return true, "", nil +} + var _ multiKueueWatcher = (*jobsetAdapter)(nil) func (*jobsetAdapter) GetEmptyList() client.ObjectList { @@ -93,7 +110,7 @@ func (*jobsetAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, er return types.NamespacedName{}, errors.New("not a jobset") } - prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[constants.PrebuiltWorkloadLabel] + prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[controllerconstants.PrebuiltWorkloadLabel] if !hasPrebuiltWorkload { return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for jobset: %s", klog.KObj(jobSet)) } diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index 6dd4ee91e3..0a05c1ffeb 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -30,7 +30,8 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) { } baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace) - baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace) + baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).Label(jobset.LabelManagedBy, constants.KueueName) cases := map[string]struct { managersWorkloads []kueue.Workload @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) { wantWorker1Workloads []kueue.Workload wantWorker1JobSets []jobset.JobSet }{ - "remote wl with reservation, multikueue AC is marked Ready": { + "wl with unmanaged owner is rejected ": { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) { }, managersJobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, - worker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: "ac1", - State: kueue.CheckStateReady, - Message: `The workload got reservation on "worker1"`, + State: kueue.CheckStateRejected, + Message: `The owner is not managed by Kueue: Expecting label "alpha.jobset.sigs.k8s.io/managed-by" to be "kueue" not ""`, }). ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, wantManagersJobsSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), - }, - - wantWorker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, - wantWorker1JobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). - Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, }, "remote jobset status is changed, the status is copied in the local Jobset ": { @@ -124,7 +108,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). JobsStatus( jobset.ReplicatedJobStatus{ Name: "replicated-job-1", @@ -180,7 +164,7 @@ func TestWlReconcileJobset(t *testing.T) { }, wantWorker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). JobsStatus( jobset.ReplicatedJobStatus{ Name: "replicated-job-1", @@ -215,7 +199,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, @@ -252,7 +236,7 @@ func TestWlReconcileJobset(t *testing.T) { }, wantWorker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, @@ -279,7 +263,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index bba63af270..f112aab5a9 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -73,6 +73,11 @@ type jobAdapter interface { // kept Pending while the job runs in a worker. This might be needed to keep the managers job // suspended and not start the execution locally. KeepAdmissionCheckPending() bool + // IsJobManagedByKueue returns + // a bool indicating if the job object identified by key is managed by kueue and can be delegated. + // a reason message + // any API error encountered during the check + IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) } type wlGroup struct { @@ -181,6 +186,15 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage) } + managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}) + if err != nil { + return reconcile.Result{}, err + } + + if !managed { + return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason)) + } + grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name) if err != nil { return reconcile.Result{}, err diff --git a/site/content/en/docs/concepts/multikueue.md b/site/content/en/docs/concepts/multikueue.md index 12377f69b5..cab1f4a3be 100644 --- a/site/content/en/docs/concepts/multikueue.md +++ b/site/content/en/docs/concepts/multikueue.md @@ -58,10 +58,8 @@ Known Limitations: There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme). ### JobSet -Known Limitations: -- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller. -An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this. +Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, MultiKueue expects the JobSets submitted to a ClusterQueue using it to have `alpha.jobset.sigs.k8s.io/managed-by` label set to `kueue`. ## Submitting Jobs In a [configured MultiKueue environemnt](/docs/tasks/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue. diff --git a/site/content/en/docs/tasks/run_jobsets.md b/site/content/en/docs/tasks/run_jobsets.md index b04a2e0f7c..ad539da104 100644 --- a/site/content/en/docs/tasks/run_jobsets.md +++ b/site/content/en/docs/tasks/run_jobsets.md @@ -31,7 +31,15 @@ metadata: kueue.x-k8s.io/queue-name: user-queue ``` -### b. Configure the resource needs +### b. MultiKueue + +If the JobSet is submitted to a queue using [MultiKueue](/docs/concepts/multikueue) one additional label needs to be specified. + +```yaml + alpha.jobset.sigs.k8s.io/managed-by: kueue +``` + +### c. Configure the resource needs The resource needs of the workload can be configured in the `spec.replicatedJobs`. Should also be taken into account that number of replicas, [parallelism](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs) and completions affect the resource calculations. @@ -49,7 +57,7 @@ The resource needs of the workload can be configured in the `spec.replicatedJobs cpu: 1 ``` -### c. Jobs prioritisation +### d. Jobs prioritisation The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) of `spec.replicatedJobs` that is not empty will be used as the priority. @@ -65,6 +73,64 @@ The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-evi The JobSet looks like the following: +### Single Cluster Environment + +```yaml +# jobset-sample.yaml +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + generateName: sleep-job- + labels: + kueue.x-k8s.io/queue-name: user-queue +spec: + network: + enableDNSHostnames: false + subdomain: some-subdomain + replicatedJobs: + - name: workers + replicas: 2 + template: + spec: + parallelism: 4 + completions: 4 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 1 + memory: "200Mi" + command: + - sleep + args: + - 100s + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 2 + memory: "200Mi" + command: + - sleep + args: + - 100s +``` + +### MultiKueue Environment + ```yaml # jobset-sample.yaml apiVersion: jobset.x-k8s.io/v1alpha2 @@ -72,6 +138,7 @@ kind: JobSet metadata: generateName: sleep-job- labels: + alpha.jobset.sigs.k8s.io/managed-by: kueue kueue.x-k8s.io/queue-name: user-queue spec: network: diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index d4d2f107d3..e95a76aca7 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -33,6 +33,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" @@ -250,6 +251,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { // Since it requires 2 CPU in total, this jobset can only be admitted in worker 1. jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + Label(jobset.LabelManagedBy, constants.KueueName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 13b05e0b8c..f389d2ba23 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -32,6 +32,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" @@ -394,6 +395,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should run a jobSet on worker if admitted", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + Label(jobset.LabelManagedBy, constants.KueueName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", @@ -573,6 +575,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + Label(jobset.LabelManagedBy, constants.KueueName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1",