diff --git a/hack/multikueue-e2e-test.sh b/hack/multikueue-e2e-test.sh index ccfa8b2b31..7607bc8493 100755 --- a/hack/multikueue-e2e-test.sh +++ b/hack/multikueue-e2e-test.sh @@ -78,14 +78,8 @@ function kind_load { # JOBSET SETUP - # MANAGER - # Only install the CRDs and not the controller to be able to - # have JobSets admitted without execution in the manager cluster. - kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME} - kubectl apply --server-side -f ${JOBSET_CRDS}/* - - #WORKERS docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION + install_jobset $MANAGER_KIND_CLUSTER_NAME install_jobset $WORKER1_KIND_CLUSTER_NAME install_jobset $WORKER2_KIND_CLUSTER_NAME fi diff --git a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go index 355ba334a0..3323084877 100644 --- a/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/batchjob_adapter.go @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c func (b *batchJobAdapter) KeepAdmissionCheckPending() bool { return true } + +func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) { + return true, "", nil +} diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go index 26febb05cb..f614bbb9b6 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" @@ -65,6 +66,9 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client, remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin + // set the manager + remoteJob.Spec.ManagedBy = ptr.To(jobset.JobSetControllerName) + return remoteClient.Create(ctx, &remoteJob) } @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool { return false } +func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) { + js := jobset.JobSet{} + err := c.Get(ctx, key, &js) + if err != nil { + return false, "", err + } + jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "") + if jobsetControllerName != ControllerName { + return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobsetControllerName), nil + } + return true, "", nil +} + var _ multiKueueWatcher = (*jobsetAdapter)(nil) func (*jobsetAdapter) GetEmptyList() client.ObjectList { diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index 6dd4ee91e3..a220f601da 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package multikueue import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) { } baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace) - baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace) + baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).ManagedBy(ControllerName) cases := map[string]struct { managersWorkloads []kueue.Workload @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) { wantWorker1Workloads []kueue.Workload wantWorker1JobSets []jobset.JobSet }{ - "remote wl with reservation, multikueue AC is marked Ready": { + "wl with unmanaged owner is rejected ": { managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) { }, managersJobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, - worker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). AdmissionCheck(kueue.AdmissionCheckState{ Name: "ac1", - State: kueue.CheckStateReady, - Message: `The workload got reservation on "worker1"`, + State: kueue.CheckStateRejected, + Message: fmt.Sprintf("The owner is not managed by Kueue: Expecting spec.managedBy to be %q not \"\"", ControllerName), }). ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). Obj(), }, wantManagersJobsSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy().Obj(), - }, - - wantWorker1Workloads: []kueue.Workload{ - *baseWorkloadBuilder.Clone(). - ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). - Obj(), - }, - wantWorker1JobSets: []jobset.JobSet{ - *baseJobSetBuilder.DeepCopy(). - Label(constants.PrebuiltWorkloadLabel, "wl1"). - Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). - Obj(), + *testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(), }, }, "remote jobset status is changed, the status is copied in the local Jobset ": { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 7862a00fdd..07b37a1428 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -73,6 +73,11 @@ type jobAdapter interface { // kept Pending while the job runs in a worker. This might be needed to keep the managers job // suspended and not start the execution locally. KeepAdmissionCheckPending() bool + // IsJobManagedByKueue returns: + // - a bool indicating if the job object identified by key is managed by kueue and can be delegated. + // - a reason indicating why the job is not managed by Kueue + // - any API error encountered during the check + IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) } type wlGroup struct { @@ -135,7 +140,7 @@ func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error if controllerutil.RemoveFinalizer(remWl, kueue.ResourceInUseFinalizerName) { if err := g.remoteClients[cluster].client.Update(ctx, remWl); err != nil { - return fmt.Errorf("removing remote workloads finalizeer: %w", err) + return fmt.Errorf("removing remote workloads finalizer: %w", err) } } @@ -159,19 +164,54 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re // 1. use a finalizer // 2. try to trigger the remote deletion from an event filter. - grp, err := a.readGroup(ctx, wl) + mkAc, err := a.multikueueAC(ctx, wl) if err != nil { return reconcile.Result{}, err } - if grp == nil { + if mkAc == nil || mkAc.State == kueue.CheckStateRejected { log.V(2).Info("Skip Workload") return reconcile.Result{}, nil } + adapter, owner := a.adapter(wl) + if adapter == nil { + // Reject the workload since there is no chance for it to run. + var rejectionMessage string + if owner != nil { + rejectionMessage = fmt.Sprintf("No multikueue adapter found for owner kind %q", schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind).String()) + } else { + rejectionMessage = "No multikueue adapter found" + } + return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage) + } + + managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace}) + if err != nil { + return reconcile.Result{}, err + } + + if !managed { + return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason)) + } + + grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name) + if err != nil { + return reconcile.Result{}, err + } + return a.reconcileGroup(ctx, grp) } +func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *kueue.AdmissionCheckState, status kueue.CheckState, message string) error { + acs.State = status + acs.Message = message + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(wl) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) +} + func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) { cfg, err := w.helper.ConfigForAdmissionCheck(ctx, acName) if err != nil { @@ -192,8 +232,8 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m return clients, nil } -func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*wlGroup, error) { - relevantChecks, err := admissioncheck.FilterForController(ctx, a.client, local.Status.AdmissionChecks, ControllerName) +func (w *wlReconciler) multikueueAC(ctx context.Context, local *kueue.Workload) (*kueue.AdmissionCheckState, error) { + relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, ControllerName) if err != nil { return nil, err } @@ -201,33 +241,30 @@ func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*w if len(relevantChecks) == 0 { return nil, nil } + return workload.FindAdmissionCheck(local.Status.AdmissionChecks, relevantChecks[0]), nil +} - rClients, err := a.remoteClientsForAC(ctx, relevantChecks[0]) - if err != nil { - return nil, fmt.Errorf("admission check %q: %w", relevantChecks[0], err) - } - - // Lookup the adapter. - var adapter jobAdapter - controllerKey := types.NamespacedName{} +func (w *wlReconciler) adapter(local *kueue.Workload) (jobAdapter, *metav1.OwnerReference) { if controller := metav1.GetControllerOf(local); controller != nil { adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String() - adapter = adapters[adapterKey] - controllerKey.Namespace = local.Namespace - controllerKey.Name = controller.Name + return adapters[adapterKey], controller } + return nil, nil +} - if adapter == nil { - return nil, nil +func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) { + rClients, err := a.remoteClientsForAC(ctx, acName) + if err != nil { + return nil, fmt.Errorf("admission check %q: %w", acName, err) } grp := wlGroup{ local: local, remotes: make(map[string]*kueue.Workload, len(rClients)), remoteClients: rClients, - acName: relevantChecks[0], + acName: acName, jobAdapter: adapter, - controllerKey: controllerKey, + controllerKey: types.NamespacedName{Name: controllerName, Namespace: local.Namespace}, } for remote, rClient := range rClients { @@ -261,12 +298,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco } if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry { - acs.State = kueue.CheckStatePending - acs.Message = "Requeued" - acs.LastTransitionTime = metav1.NewTime(time.Now()) - wlPatch := workload.BaseSSAWorkload(group.local) - workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) - errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)) + errs = append(errs, a.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued")) } return reconcile.Result{}, errors.Join(errs...) diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index c004d43a23..7b6bdc83b1 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -93,7 +93,7 @@ func TestWlReconcile(t *testing.T) { *baseWorkloadBuilder.Clone().Obj(), }, }, - "unmanaged wl (no parent) is ignored": { + "unmanaged wl (no parent) is rejected": { reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). @@ -102,9 +102,24 @@ func TestWlReconcile(t *testing.T) { }, wantManagersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: "No multikueue adapter found"}). + Obj(), + }, + }, + "unmanaged wl (owned by pod) is rejected": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1"). AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). Obj(), }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1"). + AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: `No multikueue adapter found for owner kind "/v1, Kind=Pod"`}). + Obj(), + }, }, "failing to read from a worker": { reconcileFor: "wl1", diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index 7d0ba623b9..1b4a746f88 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -161,3 +161,8 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper { apimeta.SetStatusCondition(&j.Status.Conditions, c) return j } + +func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper { + j.Spec.ManagedBy = &c + return j +} diff --git a/site/content/en/docs/concepts/multikueue.md b/site/content/en/docs/concepts/multikueue.md index a9faf1a12d..d45a51f3b0 100644 --- a/site/content/en/docs/concepts/multikueue.md +++ b/site/content/en/docs/concepts/multikueue.md @@ -58,10 +58,9 @@ Known Limitations: There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme). ### JobSet -Known Limitations: -- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller. -An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this. +When you want to submit JobSets to a ClusterQueue with a MultiKueue admission check, you should set the `spec.managedBy` field to `kueue.x-k8s.io/multikueue`, otherwise the admission check controller will `Reject` the workload causing it to be marked as `Finished` with an error indicating the cause. +The `managedBy` field is available in JobSet v0.5.0 and newer. ## Submitting Jobs In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue. diff --git a/site/content/en/docs/tasks/run/jobsets.md b/site/content/en/docs/tasks/run/jobsets.md index 28517794af..e91e48e22c 100644 --- a/site/content/en/docs/tasks/run/jobsets.md +++ b/site/content/en/docs/tasks/run/jobsets.md @@ -32,7 +32,11 @@ metadata: kueue.x-k8s.io/queue-name: user-queue ``` -### b. Configure the resource needs +### b. MultiKueue + +If the JobSet is submitted to a queue using [MultiKueue](/docs/concepts/multikueue) its `spec.managedBy` field needs to be set to `kueue.x-k8s.io/multikueue`. Otherwise the its workload will be marked as `Finished` with an error indicating this cause. + +### c. Configure the resource needs The resource needs of the workload can be configured in the `spec.replicatedJobs`. Should also be taken into account that number of replicas, [parallelism](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs) and completions affect the resource calculations. @@ -50,7 +54,7 @@ The resource needs of the workload can be configured in the `spec.replicatedJobs cpu: 1 ``` -### c. Jobs prioritisation +### d. Jobs prioritisation The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) of `spec.replicatedJobs` that is not empty will be used as the priority. @@ -66,6 +70,64 @@ The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-evi The JobSet looks like the following: +### Single Cluster Environment + +```yaml +# jobset-sample.yaml +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + generateName: sleep-job- + labels: + kueue.x-k8s.io/queue-name: user-queue +spec: + network: + enableDNSHostnames: false + subdomain: some-subdomain + replicatedJobs: + - name: workers + replicas: 2 + template: + spec: + parallelism: 4 + completions: 4 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 1 + memory: "200Mi" + command: + - sleep + args: + - 100s + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 2 + memory: "200Mi" + command: + - sleep + args: + - 100s +``` + +### MultiKueue Environment + ```yaml # jobset-sample.yaml apiVersion: jobset.x-k8s.io/v1alpha2 @@ -118,6 +180,7 @@ spec: - sleep args: - 100s + managedBy: kueue.x-k8s.io/multikueue ``` You can run this JobSet with the following commands: diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index ed396755bb..397192e3f2 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -250,6 +250,7 @@ var _ = ginkgo.Describe("MultiKueue", func() { // Since it requires 2 CPU in total, this jobset can only be admitted in worker 1. jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 6f9ec2040c..6ac25042f1 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -394,6 +394,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should run a jobSet on worker if admitted", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1", @@ -573,6 +574,7 @@ var _ = ginkgo.Describe("Multikueue", func() { ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() { jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). Queue(managerLq.Name). + ManagedBy(multikueue.ControllerName). ReplicatedJobs( testingjobset.ReplicatedJobRequirements{ Name: "replicated-job-1",