Skip to content

Commit

Permalink
[multikueue] Reject Workload with unsupported owner type.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 20, 2024
1 parent ba46285 commit 168eda9
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 27 deletions.
70 changes: 44 additions & 26 deletions pkg/controller/admissionchecks/multikueue/workload.go
Expand Up @@ -135,7 +135,7 @@ func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error

if controllerutil.RemoveFinalizer(remWl, kueue.ResourceInUseFinalizerName) {
if err := g.remoteClients[cluster].client.Update(ctx, remWl); err != nil {
return fmt.Errorf("removing remote workloads finalizeer: %w", err)
return fmt.Errorf("removing remote workloads finalizer: %w", err)
}
}

Expand All @@ -159,19 +159,45 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
// 1. use a finalizer
// 2. try to trigger the remote deletion from an event filter.

grp, err := a.readGroup(ctx, wl)
mkAc, err := a.multikueueAC(ctx, wl)
if err != nil {
return reconcile.Result{}, err
}

if grp == nil {
if mkAc == nil || mkAc.State == kueue.CheckStateRejected {
log.V(2).Info("Skip Workload")
return reconcile.Result{}, nil
}

adapter, owner := a.adapter(wl)
if adapter == nil {
// Reject the workload since there is no chance for it to run.
var rejectionMessage string
if owner != nil {
rejectionMessage = fmt.Sprintf("No multikueue adapter found for owner kind %q", schema.FromAPIVersionAndKind(owner.APIVersion, owner.Kind).String())
} else {
rejectionMessage = "No multikueue adapter found"
}
return reconcile.Result{}, a.updateACS(ctx, wl, mkAc, kueue.CheckStateRejected, rejectionMessage)
}

grp, err := a.readGroup(ctx, wl, mkAc.Name, adapter, owner.Name)
if err != nil {
return reconcile.Result{}, err
}

return a.reconcileGroup(ctx, grp)
}

func (w *wlReconciler) updateACS(ctx context.Context, wl *kueue.Workload, acs *kueue.AdmissionCheckState, status kueue.CheckState, message string) error {
acs.State = status
acs.Message = message
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(wl)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
return w.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)
}

func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) {
cfg, err := w.helper.ConfigForAdmissionCheck(ctx, acName)
if err != nil {
Expand All @@ -192,42 +218,39 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m
return clients, nil
}

func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*wlGroup, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, a.client, local.Status.AdmissionChecks, ControllerName)
func (w *wlReconciler) multikueueAC(ctx context.Context, local *kueue.Workload) (*kueue.AdmissionCheckState, error) {
relevantChecks, err := admissioncheck.FilterForController(ctx, w.client, local.Status.AdmissionChecks, ControllerName)
if err != nil {
return nil, err
}

if len(relevantChecks) == 0 {
return nil, nil
}
return workload.FindAdmissionCheck(local.Status.AdmissionChecks, relevantChecks[0]), nil
}

rClients, err := a.remoteClientsForAC(ctx, relevantChecks[0])
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", relevantChecks[0], err)
}

// Lookup the adapter.
var adapter jobAdapter
controllerKey := types.NamespacedName{}
func (w *wlReconciler) adapter(local *kueue.Workload) (jobAdapter, *metav1.OwnerReference) {
if controller := metav1.GetControllerOf(local); controller != nil {
adapterKey := schema.FromAPIVersionAndKind(controller.APIVersion, controller.Kind).String()
adapter = adapters[adapterKey]
controllerKey.Namespace = local.Namespace
controllerKey.Name = controller.Name
return adapters[adapterKey], controller
}
return nil, nil
}

if adapter == nil {
return nil, nil
func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload, acName string, adapter jobAdapter, controllerName string) (*wlGroup, error) {
rClients, err := a.remoteClientsForAC(ctx, acName)
if err != nil {
return nil, fmt.Errorf("admission check %q: %w", acName, err)
}

grp := wlGroup{
local: local,
remotes: make(map[string]*kueue.Workload, len(rClients)),
remoteClients: rClients,
acName: relevantChecks[0],
acName: acName,
jobAdapter: adapter,
controllerKey: controllerKey,
controllerKey: types.NamespacedName{Name: controllerName, Namespace: local.Namespace},
}

for remote, rClient := range rClients {
Expand Down Expand Up @@ -261,12 +284,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry {
acs.State = kueue.CheckStatePending
acs.Message = "Requeued"
acs.LastTransitionTime = metav1.NewTime(time.Now())
wlPatch := workload.BaseSSAWorkload(group.local)
workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs)
errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership))
errs = append(errs, a.updateACS(ctx, group.local, acs, kueue.CheckStatePending, "Requeued"))
}

return reconcile.Result{}, errors.Join(errs...)
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/admissionchecks/multikueue/workload_test.go
Expand Up @@ -93,7 +93,7 @@ func TestWlReconcile(t *testing.T) {
*baseWorkloadBuilder.Clone().Obj(),
},
},
"unmanaged wl (no parent) is ignored": {
"unmanaged wl (no parent) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
Expand All @@ -102,9 +102,24 @@ func TestWlReconcile(t *testing.T) {
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: "No multikueue adapter found"}).
Obj(),
},
},
"unmanaged wl (owned by pod) is rejected": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
Obj(),
},
wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "uid1").
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStateRejected, Message: `No multikueue adapter found for owner kind "/v1, Kind=Pod"`}).
Obj(),
},
},
"failing to read from a worker": {
reconcileFor: "wl1",
Expand Down

0 comments on commit 168eda9

Please sign in to comment.