Skip to content

Commit

Permalink
Do not cleanup excess pods when awaiting to observe previous clean ups (
Browse files Browse the repository at this point in the history
#1623)

* Do not cleanup excess pods when awaiting to observe previous clean ups

Change-Id: Icc748dcfb47591144da91e6ffad59676f5df9ad2

* Review

Change-Id: Ie63c4d709fdabeb84d7463dbf66696f9a8f387b0

* More resilient UID observation on Pod delete

Change-Id: Ie418ea7788d626ff8bd3a2adcc2c25086ea38fe8
  • Loading branch information
alculquicondor committed Jan 23, 2024
1 parent 46c07f0 commit 44adc22
Show file tree
Hide file tree
Showing 5 changed files with 443 additions and 22 deletions.
30 changes: 27 additions & 3 deletions pkg/controller/jobs/pod/event_handlers.go
Expand Up @@ -3,6 +3,7 @@ package pod
import (
"context"
"fmt"
"slices"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,7 +44,9 @@ func reconcileRequestForPod(p *corev1.Pod) reconcile.Request {

// podEventHandler will convert reconcile requests for pods in group from "<namespace>/<pod-name>" to
// "group/<namespace>/<group-name>".
type podEventHandler struct{}
type podEventHandler struct {
cleanedUpPodsExpectations *expectationsStore
}

func (h *podEventHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
h.queueReconcileForPod(ctx, e.Object, q)
Expand All @@ -54,11 +57,25 @@ func (h *podEventHandler) Update(ctx context.Context, e event.UpdateEvent, q wor
}

func (h *podEventHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
h.queueReconcileForPod(ctx, e.Object, q)
p, ok := e.Object.(*corev1.Pod)
if !ok {
return
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))

if g, isGroup := p.Labels[GroupNameLabel]; isGroup {
// If the watch was temporarily unavailable, it is possible that the object reported in the event still
// has a finalizer, but we can consider this Pod cleaned up, as it is being deleted.
h.cleanedUpPodsExpectations.ObservedUID(log, types.NamespacedName{Namespace: p.Namespace, Name: g}, p.UID)
}

log.V(5).Info("Queueing reconcile for pod")

q.Add(reconcileRequestForPod(p))
}

func (h *podEventHandler) Generic(ctx context.Context, e event.GenericEvent, q workqueue.RateLimitingInterface) {
h.queueReconcileForPod(ctx, e.Object, q)
}

func (h *podEventHandler) queueReconcileForPod(ctx context.Context, object client.Object, q workqueue.RateLimitingInterface) {
Expand All @@ -68,6 +85,13 @@ func (h *podEventHandler) queueReconcileForPod(ctx context.Context, object clien
}

log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))

if g, isGroup := p.Labels[GroupNameLabel]; isGroup {
if !slices.Contains(p.Finalizers, PodFinalizer) {
h.cleanedUpPodsExpectations.ObservedUID(log, types.NamespacedName{Namespace: p.Namespace, Name: g}, p.UID)
}
}

log.V(5).Info("Queueing reconcile for pod")

q.Add(reconcileRequestForPod(p))
Expand Down
84 changes: 84 additions & 0 deletions pkg/controller/jobs/pod/expectations.go
@@ -0,0 +1,84 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"sync"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)

type uids = sets.Set[types.UID]

// expectationsStore contains UIDs for which we are waiting to observe some change through event handlers.
type expectationsStore struct {
sync.Mutex
name string

store map[types.NamespacedName]uids
}

func newUIDExpectations(name string) *expectationsStore {
return &expectationsStore{
name: name,
store: make(map[types.NamespacedName]uids),
}
}

func (e *expectationsStore) ExpectUIDs(log logr.Logger, key types.NamespacedName, UIDs []types.UID) {
log.V(3).Info("Expecting UIDs", "store", e.name, "key", key, "uids", UIDs)
expectedUIDs := sets.New[types.UID](UIDs...)
e.Lock()
defer e.Unlock()

stored, found := e.store[key]
if !found {
e.store[key] = expectedUIDs
} else {
e.store[key] = stored.Union(expectedUIDs)
}
}

func (e *expectationsStore) ObservedUID(log logr.Logger, key types.NamespacedName, uid types.UID) {
log.V(3).Info("Observed UID", "store", e.name, "key", key, "uid", uid)
e.Lock()
defer e.Unlock()

stored, found := e.store[key]
if !found {
return
}
stored.Delete(uid)

// clean up key if empty.
if stored.Len() == 0 {
delete(e.store, key)
}
}

func (e *expectationsStore) Satisfied(log logr.Logger, key types.NamespacedName) bool {
e.Lock()
_, found := e.store[key]
e.Unlock()

if logV := log.V(4); logV.Enabled() {
log.V(4).Info("Retrieved satisfied expectations", "store", e.name, "key", key, "satisfied", !found)
}
return !found
}
96 changes: 96 additions & 0 deletions pkg/controller/jobs/pod/expectations_test.go
@@ -0,0 +1,96 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"testing"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/kueue/pkg/util/parallelize"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

type keyUIDs struct {
key types.NamespacedName
uids []types.UID
}

func TestExpectations(t *testing.T) {
ctx, log := utiltesting.ContextWithLog(t)
initial := []keyUIDs{
{
key: types.NamespacedName{Name: "g1"},
uids: []types.UID{"a", "b", "c"},
},
{
key: types.NamespacedName{Name: "g2"},
uids: []types.UID{"x", "y", "z"},
},
{

key: types.NamespacedName{Name: "g3"},
uids: []types.UID{"a", "b", "c", "x", "y", "z"},
},
}
expectations := newUIDExpectations("test")
err := parallelize.Until(ctx, len(initial), func(i int) error {
e := initial[i]
expectations.ExpectUIDs(log, e.key, e.uids)
return nil
})
if err != nil {
t.Fatalf("Inserting initial UIDs: %v", err)
}

observe := []keyUIDs{
{
key: types.NamespacedName{Name: "g1"},
uids: []types.UID{"a", "b", "c"},
},
{
key: types.NamespacedName{Name: "g2"},
uids: []types.UID{"x", "y", "z", "x", "y", "z"},
},
{
key: types.NamespacedName{Name: "g3"},
uids: []types.UID{"a", "b", "c", "x", "y"},
},
}
err = parallelize.Until(ctx, len(observe), func(i int) error {
e := observe[i]
return parallelize.Until(ctx, len(e.uids), func(j int) error {
expectations.ObservedUID(log, e.key, e.uids[j])
return nil
})
})
if err != nil {
t.Fatalf("Observing UIDs: %v", err)
}

wantSatisfied := map[types.NamespacedName]bool{
{Name: "g1"}: true,
{Name: "g2"}: true,
{Name: "g3"}: false,
}
for key, want := range wantSatisfied {
got := expectations.Satisfied(log, key)
if got != want {
t.Errorf("Got key %s satisfied: %t, want %t", key, got, want)
}
}
}
58 changes: 48 additions & 10 deletions pkg/controller/jobs/pod/pod_controller.go
Expand Up @@ -50,6 +50,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/parallelize"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
)

const (
Expand All @@ -65,6 +66,7 @@ const (
var (
gvk = corev1.SchemeGroupVersion.WithKind("Pod")
errIncorrectReconcileRequest = fmt.Errorf("event handler error: got a single pod reconcile request for a pod group")
errPendingOps = jobframework.UnretryableError("waiting to observe previous operations on pods")
)

func init() {
Expand All @@ -88,31 +90,36 @@ func init() {

type Reconciler struct {
*jobframework.JobReconciler
expectationsStore *expectationsStore
}

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return r.ReconcileGenericJob(ctx, req, &Pod{})
return r.ReconcileGenericJob(ctx, req, &Pod{excessPodExpectations: r.expectationsStore})
}

func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Watches(&corev1.Pod{}, &podEventHandler{}).Named("v1_pod").
Watches(&corev1.Pod{}, &podEventHandler{cleanedUpPodsExpectations: r.expectationsStore}).Named("v1_pod").
Watches(&kueue.Workload{}, &workloadHandler{}).
Complete(r)
}

func NewReconciler(c client.Client, record record.EventRecorder, opts ...jobframework.Option) jobframework.JobReconcilerInterface {
return &Reconciler{
JobReconciler: jobframework.NewReconciler(c, record, opts...),
JobReconciler: jobframework.NewReconciler(c, record, opts...),
expectationsStore: newUIDExpectations("finalizedPods"),
}
}

type Pod struct {
pod corev1.Pod
isFound bool
isGroup bool
unretriableGroup *bool
list corev1.PodList
pod corev1.Pod
key types.NamespacedName
isFound bool
isGroup bool
unretriableGroup *bool
list corev1.PodList
excessPodExpectations *expectationsStore
satisfiedExcessPods bool
}

var (
Expand Down Expand Up @@ -576,6 +583,11 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key *types.NamespacedNa
p.isGroup = true

key.Namespace = nsKey[1]
p.key = *key

// Check the expectations before listing pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store.
p.satisfiedExcessPods = p.excessPodExpectations.Satisfied(ctrl.LoggerFrom(ctx), *key)

if err := c.List(ctx, &p.list, client.MatchingLabels{
GroupNameLabel: key.Name,
Expand Down Expand Up @@ -705,14 +717,34 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, totalCount
if extraPodsCount <= 0 {
return nil
}
// Do not clean up more pods until observing previous operations
if !p.satisfiedExcessPods {
return errPendingOps
}

// Sort active pods by creation timestamp
sort.Slice(activePods, func(i, j int) bool {
return activePods[i].ObjectMeta.CreationTimestamp.Before(&activePods[j].ObjectMeta.CreationTimestamp)
pi := &activePods[i]
pj := &activePods[j]
iFin := slices.Contains(pi.Finalizers, PodFinalizer)
jFin := slices.Contains(pj.Finalizers, PodFinalizer)
// Prefer to keep pods that have a finalizer.
if iFin != jFin {
return iFin
}
iGated := gateIndex(pi) != gateNotFound
jGated := gateIndex(pj) != gateNotFound
// Prefer to keep pods that aren't gated.
if iGated != jGated {
return !iGated
}
return pi.ObjectMeta.CreationTimestamp.Before(&pj.ObjectMeta.CreationTimestamp)
})

// Extract all the latest created extra pods
extraPods := activePods[len(activePods)-extraPodsCount:]
extraPodsUIDs := utilslices.Map(extraPods, func(p *corev1.Pod) types.UID { return p.UID })
p.excessPodExpectations.ExpectUIDs(log, p.key, extraPodsUIDs)

// Finalize and delete the active pods created last

Expand All @@ -721,12 +753,18 @@ func (p *Pod) cleanupExcessPods(ctx context.Context, c client.Client, totalCount
if controllerutil.RemoveFinalizer(&pod, PodFinalizer) {
log.V(3).Info("Finalizing excess pod in group", "excessPod", klog.KObj(&pod))
if err := c.Update(ctx, &pod); err != nil {
// We won't observe this cleanup in the event handler.
p.excessPodExpectations.ObservedUID(log, p.key, pod.UID)
return err
}
}
if pod.ObjectMeta.DeletionTimestamp.IsZero() {
log.V(3).Info("Deleting excess pod in group", "excessPod", klog.KObj(&pod))
return c.Delete(ctx, &pod)
if err := c.Delete(ctx, &pod); err != nil {
// We won't observe this cleanup in the event handler.
p.excessPodExpectations.ObservedUID(log, p.key, pod.UID)
return err
}
}
return nil
})
Expand Down

0 comments on commit 44adc22

Please sign in to comment.