Skip to content

Commit

Permalink
[multikueue/jobset] Use mabagedBy field support.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Apr 16, 2024
1 parent 95ab9e8 commit 21142b9
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 44 deletions.
8 changes: 1 addition & 7 deletions hack/multikueue-e2e-test.sh
Expand Up @@ -78,14 +78,8 @@ function kind_load {


# JOBSET SETUP
# MANAGER
# Only install the CRDs and not the controller to be able to
# have JobSets admitted without execution in the manager cluster.
kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME}
kubectl apply --server-side -f ${JOBSET_CRDS}/*

#WORKERS
docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION
install_jobset $MANAGER_KIND_CLUSTER_NAME
install_jobset $WORKER1_KIND_CLUSTER_NAME
install_jobset $WORKER2_KIND_CLUSTER_NAME
fi
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Expand Up @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}

func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
return true, "", nil
}
23 changes: 20 additions & 3 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Expand Up @@ -23,11 +23,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/controller/constants"
controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants"
)

type jobsetAdapter struct{}
Expand Down Expand Up @@ -62,9 +63,12 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client,
if remoteJob.Labels == nil {
remoteJob.Labels = map[string]string{}
}
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[controllerconstants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

// set the manager
remoteJob.Spec.ManagedBy = ptr.To(jobset.JobSetControllerName)

return remoteClient.Create(ctx, &remoteJob)
}

Expand All @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
js := jobset.JobSet{}
err := c.Get(ctx, key, &js)
if err != nil {
return false, "", err
}
jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "")
if jobsetControllerName != ControllerName {
return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobsetControllerName), nil
}
return true, "", nil
}

var _ multiKueueWatcher = (*jobsetAdapter)(nil)

func (*jobsetAdapter) GetEmptyList() client.ObjectList {
Expand All @@ -93,7 +110,7 @@ func (*jobsetAdapter) GetWorkloadKey(o runtime.Object) (types.NamespacedName, er
return types.NamespacedName{}, errors.New("not a jobset")
}

prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[constants.PrebuiltWorkloadLabel]
prebuiltWl, hasPrebuiltWorkload := jobSet.Labels[controllerconstants.PrebuiltWorkloadLabel]
if !hasPrebuiltWorkload {
return types.NamespacedName{}, fmt.Errorf("no prebuilt workload found for jobset: %s", klog.KObj(jobSet))
}
Expand Down
42 changes: 13 additions & 29 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package multikueue

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -30,7 +31,7 @@ import (

kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/util/slices"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
Expand All @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) {
}

baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).ManagedBy(ControllerName)

cases := map[string]struct {
managersWorkloads []kueue.Workload
Expand All @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) {
wantWorker1Workloads []kueue.Workload
wantWorker1JobSets []jobset.JobSet
}{
"remote wl with reservation, multikueue AC is marked Ready": {
"wl with unmanaged owner is rejected ": {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Expand All @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) {
},

managersJobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},

worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
State: kueue.CheckStateRejected,
Message: fmt.Sprintf("The owner is not managed by Kueue: Expecting spec.managedBy to be %q not \"\"", ControllerName),
}).
ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
},

wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},
},
"remote jobset status is changed, the status is copied in the local Jobset ": {
Expand All @@ -124,7 +108,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Expand Down Expand Up @@ -180,7 +164,7 @@ func TestWlReconcileJobset(t *testing.T) {
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
JobsStatus(
jobset.ReplicatedJobStatus{
Name: "replicated-job-1",
Expand Down Expand Up @@ -215,7 +199,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand Down Expand Up @@ -252,7 +236,7 @@ func TestWlReconcileJobset(t *testing.T) {
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand All @@ -279,7 +263,7 @@ func TestWlReconcileJobset(t *testing.T) {

worker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(controllerconstants.PrebuiltWorkloadLabel, "wl1").
Condition(metav1.Condition{Type: string(jobset.JobSetCompleted), Status: metav1.ConditionTrue}).
Obj(),
},
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/admissionchecks/multikueue/workload.go
Expand Up @@ -73,6 +73,11 @@ type jobAdapter interface {
// kept Pending while the job runs in a worker. This might be needed to keep the managers job
// suspended and not start the execution locally.
KeepAdmissionCheckPending() bool
// IsJobManagedByKueue returns
// a bool indicating if the job object identified by key is managed by kueue and can be delegated.
// a reason message
// any API error encountered during the check
IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error)
}

type wlGroup struct {
Expand Down Expand Up @@ -181,6 +186,15 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
if err != nil {
return reconcile.Result{}, err
}

if !managed {
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason))
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testingjobs/jobset/wrappers.go
Expand Up @@ -161,3 +161,8 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper {
apimeta.SetStatusCondition(&j.Status.Conditions, c)
return j
}

func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper {
j.Spec.ManagedBy = &c
return j
}
4 changes: 1 addition & 3 deletions site/content/en/docs/concepts/multikueue.md
Expand Up @@ -58,10 +58,8 @@ Known Limitations:
There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme).

### JobSet
Known Limitations:
- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller.

An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this.
Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, MultiKueue expects the JobSets submitted to a ClusterQueue using it to have `spec.managedBy` set to `kueue.x-k8s.io/multikueue`. The JobSet `managedBy` field is available since JobSet v0.5.0.

## Submitting Jobs
In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue.
Expand Down
71 changes: 69 additions & 2 deletions site/content/en/docs/tasks/run/jobsets.md
Expand Up @@ -32,7 +32,15 @@ metadata:
kueue.x-k8s.io/queue-name: user-queue
```

### b. Configure the resource needs
### b. MultiKueue

If the JobSet is submitted to a queue using [MultiKueue](/docs/concepts/multikueue) one additional label needs to be specified.

```yaml
alpha.jobset.sigs.k8s.io/managed-by: kueue
```

### c. Configure the resource needs

The resource needs of the workload can be configured in the `spec.replicatedJobs`. Should also be taken into account that number of replicas, [parallelism](https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs) and completions affect the resource calculations.

Expand All @@ -50,7 +58,7 @@ The resource needs of the workload can be configured in the `spec.replicatedJobs
cpu: 1
```

### c. Jobs prioritisation
### d. Jobs prioritisation

The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/#priorityclass) of `spec.replicatedJobs` that is not empty will be used as the priority.

Expand All @@ -66,6 +74,64 @@ The first [PriorityClassName](https://kubernetes.io/docs/concepts/scheduling-evi

The JobSet looks like the following:

### Single Cluster Environment

```yaml
# jobset-sample.yaml
apiVersion: jobset.x-k8s.io/v1alpha2
kind: JobSet
metadata:
generateName: sleep-job-
labels:
kueue.x-k8s.io/queue-name: user-queue
spec:
network:
enableDNSHostnames: false
subdomain: some-subdomain
replicatedJobs:
- name: workers
replicas: 2
template:
spec:
parallelism: 4
completions: 4
backoffLimit: 0
template:
spec:
containers:
- name: sleep
image: busybox
resources:
requests:
cpu: 1
memory: "200Mi"
command:
- sleep
args:
- 100s
- name: driver
template:
spec:
parallelism: 1
completions: 1
backoffLimit: 0
template:
spec:
containers:
- name: sleep
image: busybox
resources:
requests:
cpu: 2
memory: "200Mi"
command:
- sleep
args:
- 100s
```

### MultiKueue Environment

```yaml
# jobset-sample.yaml
apiVersion: jobset.x-k8s.io/v1alpha2
Expand Down Expand Up @@ -118,6 +184,7 @@ spec:
- sleep
args:
- 100s
managedBy: kueue.x-k8s.io/multikueue
```

You can run this JobSet with the following commands:
Expand Down
1 change: 1 addition & 0 deletions test/e2e/multikueue/e2e_test.go
Expand Up @@ -250,6 +250,7 @@ var _ = ginkgo.Describe("MultiKueue", func() {
// Since it requires 2 CPU in total, this jobset can only be admitted in worker 1.
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
ManagedBy(multikueue.ControllerName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down
2 changes: 2 additions & 0 deletions test/integration/multikueue/multikueue_test.go
Expand Up @@ -394,6 +394,7 @@ var _ = ginkgo.Describe("Multikueue", func() {
ginkgo.It("Should run a jobSet on worker if admitted", func() {
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
ManagedBy(multikueue.ControllerName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down Expand Up @@ -573,6 +574,7 @@ var _ = ginkgo.Describe("Multikueue", func() {
ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() {
jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name).
Queue(managerLq.Name).
ManagedBy(multikueue.ControllerName).
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand Down

0 comments on commit 21142b9

Please sign in to comment.