Skip to content

Commit

Permalink
MultiKueue JobSet managedBy (kubernetes-sigs#1870)
Browse files Browse the repository at this point in the history
* [multikueue] Reject Workload with unsupported owner type.

* [multikueue/jobset] Use `mabagedBy` field support.

* Review Remarks.

* Review Remarks

* Review remarks
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent 93933eb commit 5e5fe26
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 62 deletions.
8 changes: 1 addition & 7 deletions hack/multikueue-e2e-test.sh
Expand Up @@ -78,14 +78,8 @@ function kind_load {


# JOBSET SETUP
# MANAGER
# Only install the CRDs and not the controller to be able to
# have JobSets admitted without execution in the manager cluster.
kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME}
kubectl apply --server-side -f ${JOBSET_CRDS}/*

#WORKERS
docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION
install_jobset $MANAGER_KIND_CLUSTER_NAME
install_jobset $WORKER1_KIND_CLUSTER_NAME
install_jobset $WORKER2_KIND_CLUSTER_NAME
fi
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Expand Up @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}

func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
return true, "", nil
}
17 changes: 17 additions & 0 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

Expand Down Expand Up @@ -65,6 +66,9 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client,
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

// set the manager
remoteJob.Spec.ManagedBy = ptr.To(jobset.JobSetControllerName)

return remoteClient.Create(ctx, &remoteJob)
}

Expand All @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
js := jobset.JobSet{}
err := c.Get(ctx, key, &js)
if err != nil {
return false, "", err
}
jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "")
if jobsetControllerName != ControllerName {
return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobsetControllerName), nil
}
return true, "", nil
}

var _ multiKueueWatcher = (*jobsetAdapter)(nil)

func (*jobsetAdapter) GetEmptyList() client.ObjectList {
Expand Down
30 changes: 7 additions & 23 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package multikueue

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) {
}

baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).ManagedBy(ControllerName)

cases := map[string]struct {
managersWorkloads []kueue.Workload
Expand All @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) {
wantWorker1Workloads []kueue.Workload
wantWorker1JobSets []jobset.JobSet
}{
"remote wl with reservation, multikueue AC is marked Ready": {
"wl with unmanaged owner is rejected ": {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Expand All @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) {
},

managersJobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},

worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
State: kueue.CheckStateRejected,
Message: fmt.Sprintf("The owner is not managed by Kueue: Expecting spec.managedBy to be %q not \"\"", ControllerName),
}).
ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
},

wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},
},
"remote jobset status is changed, the status is copied in the local Jobset ": {
Expand Down
84 changes: 58 additions & 26 deletions pkg/controller/admissionchecks/multikueue/workload.go
Expand Up @@ -73,6 +73,11 @@ type jobAdapter interface {
// kept Pending while the job runs in a worker. This might be needed to keep the managers job
// suspended and not start the execution locally.
KeepAdmissionCheckPending() bool
// IsJobManagedByKueue returns:
// - a bool indicating if the job object identified by key is managed by kueue and can be delegated.
// - a reason indicating why the job is not managed by Kueue
// - any API error encountered during the check
IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error)
}

type wlGroup struct {
Expand Down Expand Up @@ -135,7 +140,7 @@ func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error

if controllerutil.RemoveFinalizer(remWl, kueue.ResourceInUseFinalizerName) {
if err := g.remoteClients[cluster].client.Update(ctx, remWl); err != nil {
return fmt.Errorf("removing remote workloads finalizeer: %w", err)
return fmt.Errorf("removing remote workloads finalizer: %w", err)
}
}

Expand All @@ -159,19 +164,54 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
// 1. use a finalizer
// 2. try to trigger the remote deletion from an event filter.

grp, err := a.readGroup(ctx, wl)
mkAc, err := a.multikueueAC(ctx, wl)
if err != nil {
return reconcile.Result{}, err
}

if grp == nil {
if mkAc == nil || mkAc.State == kueue.CheckStateRejected {
log.V(2).Info("Skip Workload")
return reconcile.Result{}, nil
}

adapter, owner := a.adapter(wl)
if adapter == nil {
// Reject the workload since there is no chance for it to run.
var rejectionMessage string
if owner != nil {
rejectionMessage = fmt.Sprintf("No multikueue adapter found for owner kind %q", schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind).String())
} else {
rejectionMessage = "No multikueue adapter found"
}
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
if err != nil {
return reconcile.Result{}, err
}

if !managed {
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason))
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
}

return a.reconcileGroup(ctx, grp)
}

func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *kueue.AdmissionCheckState, status kueue.CheckState, message string) error {
acs.State = status
acs.Message = message
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(wl)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
}

func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) {
cfg, err := w.helper.ConfigForAdmissionCheck(ctx, acName)
if err != nil {
Expand All @@ -192,42 +232,39 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m
return clients, nil
}

func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*wlGroup, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, a.client, local.Status.AdmissionChecks, ControllerName)
func (w *wlReconciler) multikueueAC(ctx context.Context, local *kueue.Workload) (*kueue.AdmissionCheckState, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, ControllerName)
if err != nil {
return nil, err
}

if len(relevantChecks) == 0 {
return nil, nil
}
return workload.FindAdmissionCheck(local.Status.AdmissionChecks, relevantChecks[0]), nil
}

rClients, err := a.remoteClientsForAC(ctx, relevantChecks[0])
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", relevantChecks[0], err)
}

// Lookup the adapter.
var adapter jobAdapter
controllerKey := types.NamespacedName{}
func (w *wlReconciler) adapter(local *kueue.Workload) (jobAdapter, *metav1.OwnerReference) {
if controller := metav1.GetControllerOf(local); controller != nil {
adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
adapter = adapters[adapterKey]
controllerKey.Namespace = local.Namespace
controllerKey.Name = controller.Name
return adapters[adapterKey], controller
}
return nil, nil
}

if adapter == nil {
return nil, nil
func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) {
rClients, err := a.remoteClientsForAC(ctx, acName)
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", acName, err)
}

grp := wlGroup{
local: local,
remotes: make(map[string]*kueue.Workload, len(rClients)),
remoteClients: rClients,
acName: relevantChecks[0],
acName: acName,
jobAdapter: adapter,
controllerKey: controllerKey,
controllerKey: types.NamespacedName{Name: controllerName, Namespace: local.Namespace},
}

for remote, rClient := range rClients {
Expand Down Expand Up @@ -261,12 +298,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry {
acs.State = kueue.CheckStatePending
acs.Message = "Requeued"
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(group.local)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership))
errs = append(errs, a.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued"))
}

return reconcile.Result{}, errors.Join(errs...)
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/admissionchecks/multikueue/workload_test.go
Expand Up @@ -93,7 +93,7 @@ func TestWlReconcile(t *testing.T) {
*baseWorkloadBuilder.Clone().Obj(),
},
},
"unmanaged wl (no parent) is ignored": {
"unmanaged wl (no parent) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
Expand All @@ -102,9 +102,24 @@ func TestWlReconcile(t *testing.T) {
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: "No multikueue adapter found"}).
Obj(),
},
},
"unmanaged wl (owned by pod) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: `No multikueue adapter found for owner kind "/v1, Kind=Pod"`}).
Obj(),
},
},
"failing to read from a worker": {
reconcileFor: "wl1",
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testingjobs/jobset/wrappers.go
Expand Up @@ -161,3 +161,8 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper {
apimeta.SetStatusCondition(&j.Status.Conditions, c)
return j
}

func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper {
j.Spec.ManagedBy = &c
return j
}
5 changes: 2 additions & 3 deletions site/content/en/docs/concepts/multikueue.md
Expand Up @@ -58,10 +58,9 @@ Known Limitations:
There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme).

### JobSet
Known Limitations:
- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller.

An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this.
When you want to submit JobSets to a ClusterQueue with a MultiKueue admission check, you should set the `spec.managedBy` field to `kueue.x-k8s.io/multikueue`, otherwise the admission check controller will `Reject` the workload causing it to be marked as `Finished` with an error indicating the cause.
The `managedBy` field is available in JobSet v0.5.0 and newer.

## Submitting Jobs
In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue.
Expand Down

0 comments on commit 5e5fe26

Please sign in to comment.