Skip to content

Commit

Permalink
Delay requeueing based on RequeueState
Browse files Browse the repository at this point in the history
Change-Id: I5431e741cb30541a846b65fb26aa0b7b631eb80d
  • Loading branch information
alculquicondor committed Mar 12, 2024
1 parent 1e66b43 commit 40485a2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
23 changes: 21 additions & 2 deletions pkg/controller/core/workload_controller.go
Expand Up @@ -542,8 +542,27 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
log.Error(err, "Failed to delete workload from cache")
}
})
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
var backoff time.Duration
if wlCopy.Status.RequeueState != nil && wlCopy.Status.RequeueState.RequeueAt != nil {
backoff = time.Until(wl.Status.RequeueState.RequeueAt.Time)
}
if backoff <= 0 {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
}
} else {
log.V(3).Info("Workload to be requeued after backoff", "backoff", backoff, "requeueAt", wl.Status.RequeueState.RequeueAt.Time)
time.AfterFunc(backoff, func() {
updatedWl := kueue.Workload{}
err := r.client.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWl)
if err == nil && workloadStatus(&updatedWl) == pending {
if !r.queues.AddOrUpdateWorkload(wlCopy) {
log.V(2).Info("Queue for workload didn't exist; ignored for now")
} else {
log.V(3).Info("Workload requeued after backoff")
}
}
})
}
case prevStatus == admitted && status == admitted && !equality.Semantic.DeepEqual(oldWl.Status.ReclaimablePods, wl.Status.ReclaimablePods):
// trigger the move of associated inadmissibleWorkloads, if there are any.
Expand Down
12 changes: 12 additions & 0 deletions pkg/queue/cluster_queue_impl_test.go
Expand Up @@ -328,6 +328,18 @@ func TestClusterQueueImpl(t *testing.T) {
inadmissibleWorkloadsToRequeue: []*workload.Info{workload.NewInfo(workloads[1]), workload.NewInfo(workloads[1])},
wantPending: 1,
},
"update reclaimable pods in inadmissible": {
inadmissibleWorkloadsToRequeue: []*workload.Info{
workload.NewInfo(utiltesting.MakeWorkload("w", "").PodSets(*utiltesting.MakePodSet("main", 1).Request(corev1.ResourceCPU, "1").Obj()).Obj()),
},
workloadsToUpdate: []*kueue.Workload{
utiltesting.MakeWorkload("w", "").PodSets(*utiltesting.MakePodSet("main", 2).Request(corev1.ResourceCPU, "1").Obj()).
ReclaimablePods(kueue.ReclaimablePod{Name: "main", Count: 1}).
Obj(),
},
wantActiveWorkloads: []string{"/w"},
wantPending: 1,
},
}

for name, test := range tests {
Expand Down

0 comments on commit 40485a2

Please sign in to comment.