diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index ccfa8b2b31..7607bc8493 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -78,14 +78,8 @@ function kind_load { # JOBSET SETUP - # MANAGER - # Only install the CRDs and not the controller to be able to - # have JobSets admitted without execution in the manager cluster. - kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME} - kubectl apply --server-side -f ${JOBSET_CRDS}/* - - #WORKERS docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION + install_jobset $MANAGER_KIND_CLUSTER_NAME install_jobset $WORKER1_KIND_CLUSTER_NAME install_jobset $WORKER2_KIND_CLUSTER_NAME fi diff --git a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go index 355ba334a0..3323084877 100644 --- a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c func (b *batchJobAdapter) KeepAdmissionCheckPending() bool { return true } + +func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) { + return true, "", nil +} diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go index 26febb05cb..1826eb4886 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go @@ -23,11 +23,12 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" - "sigs.k8s.io/kueue/pkg/controller/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" ) type jobsetAdapter struct{} @@ -62,9 +63,12 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client, if remoteJob.Labels == nil { remoteJob.Labels = map[string]string{} } - remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName + remoteJob.Labels[controllerconstants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin + // set the manager + remoteJob.Spec.ManagedBy = ptr.To(jobset.JobSetControllerName) + return remoteClient.Create(ctx, &remoteJob) } @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool { return false } +func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { + js := jobset.JobSet{} + err := c.Get(ctx, key, &js) + if err != nil { + return false, "", err + } + jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "") + if jobsetControllerName != ControllerName { + return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobsetControllerName), nil + } + return true, "", nil +} + var _ multiKueueWatcher = (*jobsetAdapter)(nil) func (*jobsetAdapter) GetEmptyList() client.ObjectList { @@ -93,7 +110,7 @@ func (*jobsetAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, er return types.NamespacedName{}, errors.New("not a jobset") } - prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[constants.PrebuiltWorkloadLabel] + prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[controllerconstants.PrebuiltWorkloadLabel] if !hasPrebuiltWorkload { return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for jobset: %s", klog.KObj(jobSet)) } diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index 6dd4ee91e3..ddbd36297d 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package multikueue import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -30,7 +31,7 @@ import ( kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) { } baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace) - baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace) + baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).ManagedBy(ControllerName) cases := map[string]struct { managersWorkloads []kueue.Workload @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) { wantWorker1Workloads []kueue.Workload wantWorker1JobSets []jobset.JobSet }{ - "remote wl with reservation, multikueue AC is marked Ready": { + "wl with unmanaged owner is rejected ": { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) { }, managersJobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, - worker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: "ac1", - State: kueue.CheckStateReady, - Message: `The workload got reservation on "worker1"`, + State: kueue.CheckStateRejected, + Message: fmt.Sprintf("The owner is not managed by Kueue: Expecting spec.managedBy to be %q not \"\"", ControllerName), }). ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, wantManagersJobsSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), - }, - - wantWorker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, - wantWorker1JobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). - Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, }, "remote jobset status is changed, the status is copied in the local Jobset ": { @@ -124,7 +108,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). JobsStatus( jobset.ReplicatedJobStatus{ Name: "replicated-job-1", @@ -180,7 +164,7 @@ func TestWlReconcileJobset(t *testing.T) { }, wantWorker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). JobsStatus( jobset.ReplicatedJobStatus{ Name: "replicated-job-1", @@ -215,7 +199,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, @@ -252,7 +236,7 @@ func TestWlReconcileJobset(t *testing.T) { }, wantWorker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, @@ -279,7 +263,7 @@ func TestWlReconcileJobset(t *testing.T) { worker1JobSets: []jobset.JobSet{ *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). + Label(controllerconstants.PrebuiltWorkloadLabel, "wl1"). Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}). Obj(), }, diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index bba63af270..f112aab5a9 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -73,6 +73,11 @@ type jobAdapter interface { // kept Pending while the job runs in a worker. This might be needed to keep the managers job // suspended and not start the execution locally. KeepAdmissionCheckPending() bool + // IsJobManagedByKueue returns + // a bool indicating if the job object identified by key is managed by kueue and can be delegated. + // a reason message + // any API error encountered during the check + IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) } type wlGroup struct { @@ -181,6 +186,15 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage) } + managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}) + if err != nil { + return reconcile.Result{}, err + } + + if !managed { + return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason)) + } + grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name) if err != nil { return reconcile.Result{}, err diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index 7d0ba623b9..1b4a746f88 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -161,3 +161,8 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper { apimeta.SetStatusCondition(&j.Status.Conditions, c) return j } + +func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper { + j.Spec.ManagedBy = &c + return j +} diff --git a/site/content/en/docs/concepts/multikueue.md b/site/content/en/docs/concepts/multikueue.md index a9faf1a12d..4b873aac10 100644 --- a/site/content/en/docs/concepts/multikueue.md +++ b/site/content/en/docs/concepts/multikueue.md @@ -58,10 +58,8 @@ Known Limitations: There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme). ### JobSet -Known Limitations: -- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller. -An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this. +Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, MultiKueue expects the JobSets submitted to a ClusterQueue using it to have `spec.managedBy` set to `kueue.x-k8s.io/multikueue`. The JobSet `managedBy` field is available since JobSet v0.5.0. ## Submitting Jobs In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue. diff --git a/site/content/en/docs/tasks/run/jobsets.md b/site/content/en/docs/tasks/run/jobsets.md index 28517794af..beca4e59a3 100644 --- a/site/content/en/docs/tasks/run/jobsets.md +++ b/site/content/en/docs/tasks/run/jobsets.md @@ -32,7 +32,15 @@ metadata: kueue.x-k8s.io/queue-name: user-queue ``` -### b. Configure the resource needs +### b. MultiKueue + +If the JobSet is submitted to a queue using [MultiKueue](/docs/concepts/multikueue) one additional label needs to be specified. + +```yaml + alpha.jobset.sigs.k8s.io/managed-by: kueue +``` + +### c. Configure the resource needs The resource needs of the workload can be configured in the `spec.replicatedJobs`. Should also be taken into account that number of replicas, [parallelism](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs) and completions affect the resource calculations. @@ -50,7 +58,7 @@ The resource needs of the workload can be configured in the `spec.replicatedJobs cpu: 1 ``` -### c. Jobs prioritisation +### d. Jobs prioritisation The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) of `spec.replicatedJobs` that is not empty will be used as the priority. @@ -66,6 +74,64 @@ The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-evi The JobSet looks like the following: +### Single Cluster Environment + +```yaml +# jobset-sample.yaml +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + generateName: sleep-job- + labels: + kueue.x-k8s.io/queue-name: user-queue +spec: + network: + enableDNSHostnames: false + subdomain: some-subdomain + replicatedJobs: + - name: workers + replicas: 2 + template: + spec: + parallelism: 4 + completions: 4 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 1 + memory: "200Mi" + command: + - sleep + args: + - 100s + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 2 + memory: "200Mi" + command: + - sleep + args: + - 100s +``` + +### MultiKueue Environment + ```yaml # jobset-sample.yaml apiVersion: jobset.x-k8s.io/v1alpha2 @@ -73,6 +139,7 @@ kind: JobSet metadata: generateName: sleep-job- labels: + alpha.jobset.sigs.k8s.io/managed-by: kueue kueue.x-k8s.io/queue-name: user-queue spec: network: diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index ed396755bb..397192e3f2 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -250,6 +250,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { // Since it requires 2 CPU in total, this jobset can only be admitted in worker 1. jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 6f9ec2040c..6ac25042f1 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -394,6 +394,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should run a jobSet on worker if admitted", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", @@ -573,6 +574,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1",