Skip to content

Commit

Permalink
removed changes from scheduler and workload controller
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentefb committed Apr 3, 2024
1 parent fa2732a commit b194b89
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 88 deletions.
39 changes: 14 additions & 25 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -153,20 +153,25 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// If a deactivated workload is re-activated, we need to reset the RequeueState.
if wl.Status.RequeueState != nil && ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(&wl) {
wl.Status.RequeueState = nil
log.Info("[VICENTE] FIRST IF")
return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
}

if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() {
log.Info("[VICENTE] SECOND IF")
return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl)
}

if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
log.Info("[VICENTE] THIRD IF")
return ctrl.Result{}, nil
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if cqOk {
log.Info("[VICENTE] CQOK IF")
if updated, err := r.reconcileSyncAdmissionChecks(ctx, &wl, cqName); updated || err != nil {
log.Info("[VICENTE] FOURTH IF")
return ctrl.Result{}, err
}
}
Expand All @@ -177,22 +182,28 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, err
}
log.Info("[VICENTE] ADMISSION STATUS WAS APPLIED")
if workload.IsAdmitted(&wl) {
log.Info("[VICENTE] FIFTH IF")
c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved)
r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, time.Since(c.LastTransitionTime.Time).Seconds())
}
return ctrl.Result{}, nil
}

if workload.HasQuotaReservation(&wl) {
log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA ALREADY")
if evictionTriggered, err := r.reconcileCheckBasedEviction(ctx, &wl); evictionTriggered || err != nil {
log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileCheckBasedEviction", "err", err)
return ctrl.Result{}, err
}

if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil {
log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileOnClusterQueueActiveState", "err", err)
return ctrl.Result{}, err
}

log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileNotReadyTimeout", "wl", wl.Status.Admission)
workload.SyncAdmittedCondition(&wl)
return r.reconcileNotReadyTimeout(ctx, req, &wl)
}

Expand Down Expand Up @@ -238,7 +249,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

log.Info("[VICENTE] NO ERRORS RETURNING")
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -585,37 +596,15 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
default:
// Workload update in the cache is handled here; however, some fields are immutable
// and are not supposed to actually change anything.
log.Info("[VICENTE INSIDE UPDATE METHOD UPDATING WORKLOAD IN CACHE")
if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil {
log.Error(err, "Updating workload in cache")
}
// This forces you to go through the scheduler to update PodSetAssignments if there's a difference between the
// worker group podSetAssignments.Count of the old and new workload
if features.Enabled(features.DynamicallySizedJobs) && compareAdmissionPodSetAssignmentCount(oldWl.Status.Admission, wlCopy.Status.Admission) {
if !r.queues.UpdateWorkload(oldWl, wlCopy) {
log.V(2).Info("Updated workload due to resize.")
}
}
}

return true
}

func compareAdmissionPodSetAssignmentCount(oldWlAdmission *kueue.Admission, newWlAdmisson *kueue.Admission) bool {
// this is specific to RayClusters, it contains a PodSet of length 2, containing head and workers
oldWlPsa := len(oldWlAdmission.PodSetAssignments)
newWlPsa := len(newWlAdmisson.PodSetAssignments)
if oldWlPsa == newWlPsa {
for i := 0; i < oldWlPsa; i++ {
if oldWlAdmission.PodSetAssignments[i].Name == newWlAdmisson.PodSetAssignments[i].Name {
if oldWlAdmission.PodSetAssignments[i].Count != newWlAdmisson.PodSetAssignments[i].Count {
return true
}
}
}
}
return false
}

func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool {
r.log.V(3).Info("Ignore generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind())
return false
Expand Down
65 changes: 2 additions & 63 deletions pkg/scheduler/scheduler.go
Expand Up @@ -265,9 +265,6 @@ func (s *Scheduler) schedule(ctx context.Context) {
s.cache.WaitForPodsReady(ctx)
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}
if features.Enabled(features.DynamicallySizedJobs) && e.status == assumed {
continue
}
e.status = nominated
if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
Expand Down Expand Up @@ -326,21 +323,8 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
ns := corev1.Namespace{}
e := entry{Info: w}
if s.cache.IsAssumedOrAdmittedWorkload(w) {
if features.Enabled(features.DynamicallySizedJobs) {
// here we want to get the flavors and resources assigned again so that we can update PodSetAssignments
e.assignment, e.preemptionTargets = s.getResizeAssignment(log, &e.Info, &snap)
e.inadmissibleMsg = e.assignment.Message()
e.Info.LastAssignment = &e.assignment.LastState
e.status = assumed
if err := s.updateResizePodSetAssignments(ctx, e); err != nil {
log.Error(err, "Could not apploy admission to assumed workload")
continue
}

} else {
log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj))
continue
}
log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj))
continue
} else if workload.HasRetryOrRejectedChecks(w.Obj) {
e.inadmissibleMsg = "The workload has failed admission checks"
} else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) {
Expand Down Expand Up @@ -403,29 +387,6 @@ type partialAssignment struct {
preemptionTargets []*workload.Info
}

func (s *Scheduler) getResizeAssignment(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors)
resizeAssignment := flvAssigner.Assign(log, nil)
var faPreemtionTargets []*workload.Info

arm := resizeAssignment.RepresentativeMode()
if arm == flavorassigner.Fit {
return resizeAssignment, nil
}

if arm == flavorassigner.Preempt {
faPreemtionTargets = s.preemptor.GetTargets(*wl, resizeAssignment, snap)
}

// if the feature gate is not enabled or we can preempt
if !features.Enabled(features.PartialAdmission) || len(faPreemtionTargets) > 0 {
return resizeAssignment, faPreemtionTargets
}

return resizeAssignment, nil
}

func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) {
cq := snap.ClusterQueues[wl.ClusterQueue]
flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors)
Expand Down Expand Up @@ -526,28 +487,6 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e
return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
func (s *Scheduler) updateResizePodSetAssignments(ctx context.Context, e entry) error {
newWorkload := e.Obj.DeepCopy()
admission := &kueue.Admission{
ClusterQueue: kueue.ClusterQueueReference(e.ClusterQueue),
PodSetAssignments: e.assignment.ToAPI(),
}

workload.SetQuotaReservation(newWorkload, admission)
_ = workload.SyncAdmittedCondition(newWorkload)

if e.status == assumed {
// Apply admission means to update the workload with the new admission status, this is for the case of a scale down
// we shouldn't requeue a scale down we should only update the workload
return s.applyAdmission(ctx, newWorkload)
}

return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
Expand Down

0 comments on commit b194b89

Please sign in to comment.