Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiKueue JobSet managedBy #1870

Merged
merged 5 commits into from Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 1 addition & 7 deletions hack/multikueue-e2e-test.sh
Expand Up @@ -78,14 +78,8 @@ function kind_load {


# JOBSET SETUP
# MANAGER
# Only install the CRDs and not the controller to be able to
# have JobSets admitted without execution in the manager cluster.
kubectl config use-context kind-${MANAGER_KIND_CLUSTER_NAME}
kubectl apply --server-side -f ${JOBSET_CRDS}/*

#WORKERS
docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION
install_jobset $MANAGER_KIND_CLUSTER_NAME
install_jobset $WORKER1_KIND_CLUSTER_NAME
install_jobset $WORKER2_KIND_CLUSTER_NAME
fi
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/admissionchecks/multikueue/batchjob_adapter.go
Expand Up @@ -101,3 +101,7 @@ func (b *batchJobAdapter) DeleteRemoteObject(ctx context.Context, remoteClient c
func (b *batchJobAdapter) KeepAdmissionCheckPending() bool {
return true
}

func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (b *batchJobAdapter) IsJobManagedByKueue(_ context.Context, _ client.Client, _ types.NamespacedName) (bool, string, error) {
func (b *batchJobAdapter) IsJobManagedByKueue(context.Context, client.Client, types.NamespacedName) (bool, string, error) {

This is a minor nit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have the managedBy field available for Jobs soon and we'll need to properly implement the method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

return true, "", nil
}
17 changes: 17 additions & 0 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter.go
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

Expand Down Expand Up @@ -65,6 +66,9 @@ func (b *jobsetAdapter) SyncJob(ctx context.Context, localClient client.Client,
remoteJob.Labels[constants.PrebuiltWorkloadLabel] = workloadName
remoteJob.Labels[kueuealpha.MultiKueueOriginLabel] = origin

// set the manager
remoteJob.Spec.ManagedBy = ptr.To(jobset.JobSetControllerName)

return remoteClient.Create(ctx, &remoteJob)
}

Expand All @@ -81,6 +85,19 @@ func (b *jobsetAdapter) KeepAdmissionCheckPending() bool {
return false
}

func (b *jobsetAdapter) IsJobManagedByKueue(ctx context.Context, c client.Client, key types.NamespacedName) (bool, string, error) {
js := jobset.JobSet{}
err := c.Get(ctx, key, &js)
if err != nil {
return false, "", err
}
jobsetControllerName := ptr.Deref(js.Spec.ManagedBy, "")
if jobsetControllerName != ControllerName {
return false, fmt.Sprintf("Expecting spec.managedBy to be %q not %q", ControllerName, jobsetControllerName), nil
}
return true, "", nil
}

var _ multiKueueWatcher = (*jobsetAdapter)(nil)

func (*jobsetAdapter) GetEmptyList() client.ObjectList {
Expand Down
30 changes: 7 additions & 23 deletions pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package multikueue

import (
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestWlReconcileJobset(t *testing.T) {
}

baseWorkloadBuilder := utiltesting.MakeWorkload("wl1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace)
baseJobSetBuilder := testingjobset.MakeJobSet("jobset1", TestNamespace).ManagedBy(ControllerName)

cases := map[string]struct {
managersWorkloads []kueue.Workload
Expand All @@ -60,7 +61,7 @@ func TestWlReconcileJobset(t *testing.T) {
wantWorker1Workloads []kueue.Workload
wantWorker1JobSets []jobset.JobSet
}{
"remote wl with reservation, multikueue AC is marked Ready": {
"wl with unmanaged owner is rejected ": {
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Expand All @@ -70,39 +71,22 @@ func TestWlReconcileJobset(t *testing.T) {
},

managersJobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},

worker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{
Name: "ac1",
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
State: kueue.CheckStateRejected,
Message: fmt.Sprintf("The owner is not managed by Kueue: Expecting spec.managedBy to be %q not \"\"", ControllerName),
}).
ControllerReference(jobset.SchemeGroupVersion.WithKind("JobSet"), "jobset1", "uid1").
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantManagersJobsSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().Obj(),
},

wantWorker1Workloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
},
wantWorker1JobSets: []jobset.JobSet{
*baseJobSetBuilder.DeepCopy().
Label(constants.PrebuiltWorkloadLabel, "wl1").
Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin).
Obj(),
*testingjobset.MakeJobSet("jobset1", TestNamespace).Obj(),
},
},
"remote jobset status is changed, the status is copied in the local Jobset ": {
Expand Down
84 changes: 58 additions & 26 deletions pkg/controller/admissionchecks/multikueue/workload.go
Expand Up @@ -73,6 +73,11 @@ type jobAdapter interface {
// kept Pending while the job runs in a worker. This might be needed to keep the managers job
// suspended and not start the execution locally.
KeepAdmissionCheckPending() bool
// IsJobManagedByKueue returns
// a bool indicating if the job object identified by key is managed by kueue and can be delegated.
// a reason message
// any API error encountered during the check
trasc marked this conversation as resolved.
Show resolved Hide resolved
IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error)
}

type wlGroup struct {
Expand Down Expand Up @@ -135,7 +140,7 @@ func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error

if controllerutil.RemoveFinalizer(remWl, kueue.ResourceInUseFinalizerName) {
if err := g.remoteClients[cluster].client.Update(ctx, remWl); err != nil {
return fmt.Errorf("removing remote workloads finalizeer: %w", err)
return fmt.Errorf("removing remote workloads finalizer: %w", err)
}
}

Expand All @@ -159,19 +164,54 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
// 1. use a finalizer
// 2. try to trigger the remote deletion from an event filter.

grp, err := a.readGroup(ctx, wl)
mkAc, err := a.multikueueAC(ctx, wl)
if err != nil {
return reconcile.Result{}, err
}

if grp == nil {
if mkAc == nil || mkAc.State == kueue.CheckStateRejected {
log.V(2).Info("Skip Workload")
return reconcile.Result{}, nil
}

adapter, owner := a.adapter(wl)
if adapter == nil {
// Reject the workload since there is no chance for it to run.
var rejectionMessage string
if owner != nil {
rejectionMessage = fmt.Sprintf("No multikueue adapter found for owner kind %q", schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind).String())
} else {
rejectionMessage = "No multikueue adapter found"
}
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

managed, unamagedReason, err := adapter.IsJobManagedByKueue(ctx, a.client, types.NamespacedName{Name: owner.Name, Namespace: wl.Namespace})
if err != nil {
return reconcile.Result{}, err
}

if !managed {
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, fmt.Sprintf("The owner is not managed by Kueue: %s", unamagedReason))
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
}

return a.reconcileGroup(ctx, grp)
}

func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *kueue.AdmissionCheckState, status kueue.CheckState, message string) error {
acs.State = status
acs.Message = message
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(wl)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
}

func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) {
cfg, err := w.helper.ConfigForAdmissionCheck(ctx, acName)
if err != nil {
Expand All @@ -192,42 +232,39 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m
return clients, nil
}

func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*wlGroup, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, a.client, local.Status.AdmissionChecks, ControllerName)
func (w *wlReconciler) multikueueAC(ctx context.Context, local *kueue.Workload) (*kueue.AdmissionCheckState, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, ControllerName)
if err != nil {
return nil, err
}

if len(relevantChecks) == 0 {
return nil, nil
}
return workload.FindAdmissionCheck(local.Status.AdmissionChecks, relevantChecks[0]), nil
}

rClients, err := a.remoteClientsForAC(ctx, relevantChecks[0])
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", relevantChecks[0], err)
}

// Lookup the adapter.
var adapter jobAdapter
controllerKey := types.NamespacedName{}
func (w *wlReconciler) adapter(local *kueue.Workload) (jobAdapter, *metav1.OwnerReference) {
if controller := metav1.GetControllerOf(local); controller != nil {
adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
adapter = adapters[adapterKey]
controllerKey.Namespace = local.Namespace
controllerKey.Name = controller.Name
return adapters[adapterKey], controller
}
return nil, nil
}

if adapter == nil {
return nil, nil
func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) {
rClients, err := a.remoteClientsForAC(ctx, acName)
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", acName, err)
}

grp := wlGroup{
local: local,
remotes: make(map[string]*kueue.Workload, len(rClients)),
remoteClients: rClients,
acName: relevantChecks[0],
acName: acName,
jobAdapter: adapter,
controllerKey: controllerKey,
controllerKey: types.NamespacedName{Name: controllerName, Namespace: local.Namespace},
}

for remote, rClient := range rClients {
Expand Down Expand Up @@ -261,12 +298,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry {
acs.State = kueue.CheckStatePending
acs.Message = "Requeued"
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(group.local)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership))
errs = append(errs, a.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued"))
}

return reconcile.Result{}, errors.Join(errs...)
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/admissionchecks/multikueue/workload_test.go
Expand Up @@ -93,7 +93,7 @@ func TestWlReconcile(t *testing.T) {
*baseWorkloadBuilder.Clone().Obj(),
},
},
"unmanaged wl (no parent) is ignored": {
"unmanaged wl (no parent) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
Expand All @@ -102,9 +102,24 @@ func TestWlReconcile(t *testing.T) {
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: "No multikueue adapter found"}).
Obj(),
},
},
"unmanaged wl (owned by pod) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: `No multikueue adapter found for owner kind "/v1, Kind=Pod"`}).
Obj(),
},
},
"failing to read from a worker": {
reconcileFor: "wl1",
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testingjobs/jobset/wrappers.go
Expand Up @@ -161,3 +161,8 @@ func (j *JobSetWrapper) Condition(c metav1.Condition) *JobSetWrapper {
apimeta.SetStatusCondition(&j.Status.Conditions, c)
return j
}

func (j *JobSetWrapper) ManagedBy(c string) *JobSetWrapper {
j.Spec.ManagedBy = &c
return j
}
4 changes: 1 addition & 3 deletions site/content/en/docs/concepts/multikueue.md
Expand Up @@ -58,10 +58,8 @@ Known Limitations:
There is an ongoing effort to overcome these limitations by adding the possibility to disable the reconciliation of some jobs by the Kubernetes `batch/Job` controller. Details in `kubernetes/enhancements` [KEP-4368](https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/4368-support-managed-by-label-for-batch-jobs#readme).

### JobSet
Known Limitations:
- Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, you should only install the JobSet CRDs, but not the controller.

An approach similar to the one described for [`batch/Job`](#batchjob) is taken into account to overcome this.
Since unsuspending a JobSet in the manager cluster will lead to its local execution and updating the status of a local JobSet could conflict with its main controller, MultiKueue expects the JobSets submitted to a ClusterQueue using it to have `spec.managedBy` set to `kueue.x-k8s.io/multikueue`. The JobSet `managedBy` field is available since JobSet v0.5.0.
trasc marked this conversation as resolved.
Show resolved Hide resolved

## Submitting Jobs
In a [configured MultiKueue environment](/docs/tasks/manage/setup_multikueue), you can submit any MultiKueue supported job to the Manager cluster, targeting a ClusterQueue configured for Multikueue.
Expand Down