Skip to content

Commit

Permalink
[multikueue/jobset] Use mabaged-by label support.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 20, 2024
1 parent 168eda9 commit 2b25e98
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 45 deletions.
9 changes: 1 addition & 8 deletions hack/multikueue-e2e-test.sh
Expand Up @@ -27,7 +27,6 @@ export WORKER2_KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME}-worker2

export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml
export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION}
export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/

source ${SOURCE_DIR}/e2e-common.sh

Expand Down Expand Up @@ -90,14 +89,8 @@ function kind_load {


# JOBSET SETUP
# MANAGER
# Only install the CRDs and not the controller to be able to
# have JobSets admitted without execution in the manager cluster.
kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME}
kubectl apply --server-side -f ${JOBSET_CRDS}/*

#WORKERS
docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION
install_jobset $MANAGER_KIND_CLUSTER_NAME
install_jobset $WORKER1_KIND_CLUSTER_NAME
install_jobset $WORKER2_KIND_CLUSTER_NAME
fi
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Expand Up @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}

func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
return true, "", nil
}
23 changes: 20 additions & 3 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Expand Up @@ -27,7 +27,8 @@ import (
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/constants"
controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants"
)

type jobsetAdapter struct{}
Expand Down Expand Up @@ -62,9 +63,12 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client,
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[controllerconstants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

// set the manager
remoteJob.Labels[jobset.LabelManagedBy] = jobset.JobSetManager

return remoteClient.Create(ctx, &remoteJob)
}

Expand All @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
js := jobset.JobSet{}
err := c.Get(ctx, key, &js)
if err != nil {
return false, "", err
}
lblVal := js.Labels[jobset.LabelManagedBy]
if lblVal != constants.KueueName {
return false, fmt.Sprintf("Expecting label %q to be %q not %q", jobset.LabelManagedBy, constants.KueueName, lblVal), nil
}
return true, "", nil
}

var _ multiKueueWatcher = (*jobsetAdapter)(nil)

func (*jobsetAdapter) GetEmptyList() client.ObjectList {
Expand All @@ -93,7 +110,7 @@ func (*jobsetAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, er
return types.NamespacedName{}, errors.New("not a jobset")
}

prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[constants.PrebuiltWorkloadLabel]
prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[controllerconstants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for jobset: %s", klog.KObj(jobSet))
}
Expand Down
42 changes: 13 additions & 29 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Expand Up @@ -30,7 +30,8 @@ import (

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/constants"
controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/util/slices"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
Expand All @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) {
}

baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).Label(jobset.LabelManagedBy, constants.KueueName)

cases := map[string]struct {
managersWorkloads []kueue.Workload
Expand All @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) {
wantWorker1Workloads []kueue.Workload
wantWorker1JobSets []jobset.JobSet
}{
"remote wl with reservation, multikueue AC is marked Ready": {
"wl with unmanaged owner is rejected ": {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Expand All @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) {
},

managersJobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},

worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
State: kueue.CheckStateRejected,
Message: `The owner is not managed by Kueue: Expecting label "alpha.jobset.sigs.k8s.io/managed-by" to be "kueue" not ""`,
}).
ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
},

wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},
},
"remote jobset status is changed, the status is copied in the local Jobset ": {
Expand All @@ -124,7 +108,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Expand Down Expand Up @@ -180,7 +164,7 @@ func TestWlReconcileJobset(t *testing.T) {
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Expand Down Expand Up @@ -215,7 +199,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand Down Expand Up @@ -252,7 +236,7 @@ func TestWlReconcileJobset(t *testing.T) {
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand All @@ -279,7 +263,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/admissionchecks/multikueue/workload.go
Expand Up @@ -73,6 +73,11 @@ type jobAdapter interface {
// kept Pending while the job runs in a worker. This might be needed to keep the managers job
// suspended and not start the execution locally.
KeepAdmissionCheckPending() bool
// IsJobManagedByKueue returns
// a bool indicating if the job object identified by key is managed by kueue and can be delegated.
// a reason message
// any API error encountered during the check
IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error)
}

type wlGroup struct {
Expand Down Expand Up @@ -181,6 +186,15 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
if err != nil {
return reconcile.Result{}, err
}

if !managed {
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason))
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
Expand Down
4 changes: 1 addition & 3 deletions site/content/en/docs/concepts/multikueue.md
Expand Up @@ -58,10 +58,8 @@ Known Limitations:
There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme).

### JobSet
Known Limitations:
- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller.

An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this.
Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, MultiKueue expects the JobSets submitted to a ClusterQueue using it to have `alpha.jobset.sigs.k8s.io/managed-by` label set to `kueue`.

## Submitting Jobs
In a [configured MultiKueue environemnt](/docs/tasks/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue.
Expand Down
71 changes: 69 additions & 2 deletions site/content/en/docs/tasks/run_jobsets.md
Expand Up @@ -31,7 +31,15 @@ metadata:
kueue.x-k8s.io/queue-name: user-queue
```

### b. Configure the resource needs
### b. MultiKueue

If the JobSet is submitted to a queue using [MultiKueue](/docs/concepts/multikueue) one additional label needs to be specified.

```yaml
alpha.jobset.sigs.k8s.io/managed-by: kueue
```

### c. Configure the resource needs

The resource needs of the workload can be configured in the `spec.replicatedJobs`. Should also be taken into account that number of replicas, [parallelism](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs) and completions affect the resource calculations.

Expand All @@ -49,7 +57,7 @@ The resource needs of the workload can be configured in the `spec.replicatedJobs
cpu: 1
```

### c. Jobs prioritisation
### d. Jobs prioritisation

The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) of `spec.replicatedJobs` that is not empty will be used as the priority.

Expand All @@ -65,13 +73,72 @@ The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-evi

The JobSet looks like the following:

### Single Cluster Environment

```yaml
# jobset-sample.yaml
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
generateName: sleep-job-
labels:
kueue.x-k8s.io/queue-name: user-queue
spec:
network:
enableDNSHostnames: false
subdomain: some-subdomain
replicatedJobs:
- name: workers
replicas: 2
template:
spec:
parallelism: 4
completions: 4
backoffLimit: 0
template:
spec:
containers:
- name: sleep
image: busybox
resources:
requests:
cpu: 1
memory: "200Mi"
command:
- sleep
args:
- 100s
- name: driver
template:
spec:
parallelism: 1
completions: 1
backoffLimit: 0
template:
spec:
containers:
- name: sleep
image: busybox
resources:
requests:
cpu: 2
memory: "200Mi"
command:
- sleep
args:
- 100s
```

### MultiKueue Environment

```yaml
# jobset-sample.yaml
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
generateName: sleep-job-
labels:
alpha.jobset.sigs.k8s.io/managed-by: kueue
kueue.x-k8s.io/queue-name: user-queue
spec:
network:
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/multikueue/e2e_test.go
Expand Up @@ -33,6 +33,7 @@ import (

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
Expand Down Expand Up @@ -250,6 +251,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {
// Since it requires 2 CPU in total, this jobset can only be admitted in worker 1.
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
Label(jobset.LabelManagedBy, constants.KueueName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down
3 changes: 3 additions & 0 deletions test/integration/multikueue/multikueue_test.go
Expand Up @@ -32,6 +32,7 @@ import (

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
Expand Down Expand Up @@ -394,6 +395,7 @@ var _ = ginkgo.Describe("Multikueue", func() {
ginkgo.It("Should run a jobSet on worker if admitted", func() {
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
Label(jobset.LabelManagedBy, constants.KueueName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down Expand Up @@ -573,6 +575,7 @@ var _ = ginkgo.Describe("Multikueue", func() {
ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() {
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
Label(jobset.LabelManagedBy, constants.KueueName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down

0 comments on commit 2b25e98

Please sign in to comment.