Skip to content

Commit

Permalink
Generalize heap (kubernetes-sigs#1815)
Browse files Browse the repository at this point in the history
Change-Id: Iea7c7d65a4a380a6e310099979ef3a05ecff0b75
  • Loading branch information
alculquicondor authored and vsoch committed Apr 18, 2024
1 parent 3ea9d7c commit dc48917
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 137 deletions.
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_best_effort_fifo.go
Expand Up @@ -30,7 +30,7 @@ type ClusterQueueBestEffortFIFO struct {
var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}

func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, queueOrderingFunc(wo), realClock)
cqImpl := newClusterQueueImpl(queueOrderingFunc(wo), realClock)
cqBE := &ClusterQueueBestEffortFIFO{
clusterQueueBase: cqImpl,
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/queue/cluster_queue_impl.go
Expand Up @@ -38,7 +38,7 @@ import (
// clusterQueueBase is an incomplete base implementation of ClusterQueue
// interface. It can be inherited and overwritten by other types.
type clusterQueueBase struct {
heap heap.Heap
heap heap.Heap[workload.Info]
cohort string
namespaceSelector labels.Selector
active bool
Expand All @@ -55,20 +55,23 @@ type clusterQueueBase struct {
// QueueInadmissibleWorkloads is called.
queueInadmissibleCycle int64

lessFunc func(a, b interface{}) bool
lessFunc func(a, b *workload.Info) bool

rwm sync.RWMutex

clock clock.Clock
}

func workloadKey(i *workload.Info) string {
return workload.Key(i.Obj)
}

func newClusterQueueImpl(
keyFunc func(obj interface{}) string,
lessFunc func(a, b interface{}) bool,
lessFunc func(a, b *workload.Info) bool,
clock clock.Clock,
) *clusterQueueBase {
return &clusterQueueBase{
heap: heap.New(keyFunc, lessFunc),
heap: heap.New(workloadKey, lessFunc),
inadmissibleWorkloads: make(map[string]*workload.Info),
queueInadmissibleCycle: -1,
lessFunc: lessFunc,
Expand Down Expand Up @@ -246,8 +249,7 @@ func (c *clusterQueueBase) Pop() *workload.Info {
return nil
}

info := c.heap.Pop()
return info.(*workload.Info)
return c.heap.Pop()
}

func (c *clusterQueueBase) Dump() ([]string, bool) {
Expand All @@ -257,8 +259,7 @@ func (c *clusterQueueBase) Dump() ([]string, bool) {
return nil, false
}
elements := make([]string, c.heap.Len())
for i, e := range c.heap.List() {
info := e.(*workload.Info)
for i, info := range c.heap.List() {
elements[i] = workload.Key(info.Obj)
}
return elements, true
Expand Down Expand Up @@ -288,22 +289,15 @@ func (c *clusterQueueBase) Snapshot() []*workload.Info {
func (c *clusterQueueBase) Info(key string) *workload.Info {
c.rwm.RLock()
defer c.rwm.RUnlock()
info := c.heap.GetByKey(key)
if info == nil {
return nil
}
return info.(*workload.Info)
return c.heap.GetByKey(key)
}

func (c *clusterQueueBase) totalElements() []*workload.Info {
c.rwm.RLock()
defer c.rwm.RUnlock()
totalLen := c.heap.Len() + len(c.inadmissibleWorkloads)
elements := make([]*workload.Info, 0, totalLen)
for _, e := range c.heap.List() {
info := e.(*workload.Info)
elements = append(elements, info)
}
elements = append(elements, c.heap.List()...)
for _, e := range c.inadmissibleWorkloads {
elements = append(elements, e)
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/queue/cluster_queue_impl_test.go
Expand Up @@ -87,7 +87,7 @@ func Test_PushOrUpdate(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)

if cq.Pending() != 0 {
t.Error("ClusterQueue should be empty")
Expand Down Expand Up @@ -116,7 +116,7 @@ func Test_PushOrUpdate(t *testing.T) {

func Test_Pop(t *testing.T) {
now := time.Now()
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(now))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(now))
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj())
if cq.Pop() != nil {
Expand All @@ -138,7 +138,7 @@ func Test_Pop(t *testing.T) {
}

func Test_Delete(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
cq.PushOrUpdate(workload.NewInfo(wl1))
Expand All @@ -159,19 +159,19 @@ func Test_Delete(t *testing.T) {
}

func Test_Info(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil {
t.Error("workload doesn't exist")
if info := cq.Info(workload.Key(wl)); info != nil {
t.Error("Workload should not exist")
}
cq.PushOrUpdate(workload.NewInfo(wl))
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info == nil {
t.Error("expected workload to exist")
if info := cq.Info(workload.Key(wl)); info == nil {
t.Error("Expected workload to exist")
}
}

func Test_AddFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
queue := &LocalQueue{
items: map[string]*workload.Info{
Expand All @@ -189,7 +189,7 @@ func Test_AddFromLocalQueue(t *testing.T) {
}

func Test_DeleteFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj()
qImpl := newLocalQueue(q)
wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj()
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestClusterQueueImpl(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)
err := cq.Update(utiltesting.MakeClusterQueue("cq").
NamespaceSelector(&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestClusterQueueImpl(t *testing.T) {
}

func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq.namespaceSelector = labels.Everything()
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
cl := utiltesting.NewFakeClient(
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestBackoffWaitingTimeExpired(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)
got := cq.backoffWaitingTimeExpired(tc.workloadInfo)
if tc.want != got {
t.Errorf("Unexpected result from backoffWaitingTimeExpired\nwant: %v\ngot: %v\n", tc.want, got)
Expand Down
16 changes: 7 additions & 9 deletions pkg/queue/cluster_queue_strict_fifo.go
Expand Up @@ -37,7 +37,7 @@ type ClusterQueueStrictFIFO struct {
var _ ClusterQueue = &ClusterQueueStrictFIFO{}

func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, queueOrderingFunc(wo), realClock)
cqImpl := newClusterQueueImpl(queueOrderingFunc(wo), realClock)
cqStrict := &ClusterQueueStrictFIFO{
clusterQueueBase: cqImpl,
}
Expand All @@ -50,19 +50,17 @@ func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (Cl
// to sort workloads. The function sorts workloads based on their priority.
// When priorities are equal, it uses the workload's creation or eviction
// time.
func queueOrderingFunc(wo workload.Ordering) func(a, b interface{}) bool {
return func(a, b interface{}) bool {
objA := a.(*workload.Info)
objB := b.(*workload.Info)
p1 := utilpriority.Priority(objA.Obj)
p2 := utilpriority.Priority(objB.Obj)
func queueOrderingFunc(wo workload.Ordering) func(a, b *workload.Info) bool {
return func(a, b *workload.Info) bool {
p1 := utilpriority.Priority(a.Obj)
p2 := utilpriority.Priority(b.Obj)

if p1 != p2 {
return p1 > p2
}

tA := wo.GetQueueOrderTimestamp(objA.Obj)
tB := wo.GetQueueOrderTimestamp(objB.Obj)
tA := wo.GetQueueOrderTimestamp(a.Obj)
tB := wo.GetQueueOrderTimestamp(b.Obj)
return !tB.Before(tA)
}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/queue/local_queue.go
Expand Up @@ -23,11 +23,6 @@ import (
"sigs.k8s.io/kueue/pkg/workload"
)

func keyFunc(obj interface{}) string {
i := obj.(*workload.Info)
return workload.Key(i.Obj)
}

// Key is the key used to index the queue.
func Key(q *kueue.LocalQueue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
Expand Down

0 comments on commit dc48917

Please sign in to comment.