Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize heap implementation #1815

Merged
merged 1 commit into from Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_best_effort_fifo.go
Expand Up @@ -30,7 +30,7 @@ type ClusterQueueBestEffortFIFO struct {
var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}

func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, queueOrderingFunc(wo), realClock)
cqImpl := newClusterQueueImpl(queueOrderingFunc(wo), realClock)
cqBE := &ClusterQueueBestEffortFIFO{
clusterQueueBase: cqImpl,
}
Expand Down
30 changes: 12 additions & 18 deletions pkg/queue/cluster_queue_impl.go
Expand Up @@ -38,7 +38,7 @@ import (
// clusterQueueBase is an incomplete base implementation of ClusterQueue
// interface. It can be inherited and overwritten by other types.
type clusterQueueBase struct {
heap heap.Heap
heap heap.Heap[workload.Info]
cohort string
namespaceSelector labels.Selector
active bool
Expand All @@ -55,20 +55,23 @@ type clusterQueueBase struct {
// QueueInadmissibleWorkloads is called.
queueInadmissibleCycle int64

lessFunc func(a, b interface{}) bool
lessFunc func(a, b *workload.Info) bool

rwm sync.RWMutex

clock clock.Clock
}

func workloadKey(i *workload.Info) string {
return workload.Key(i.Obj)
}

func newClusterQueueImpl(
keyFunc func(obj interface{}) string,
lessFunc func(a, b interface{}) bool,
lessFunc func(a, b *workload.Info) bool,
clock clock.Clock,
) *clusterQueueBase {
return &clusterQueueBase{
heap: heap.New(keyFunc, lessFunc),
heap: heap.New(workloadKey, lessFunc),
inadmissibleWorkloads: make(map[string]*workload.Info),
queueInadmissibleCycle: -1,
lessFunc: lessFunc,
Expand Down Expand Up @@ -246,8 +249,7 @@ func (c *clusterQueueBase) Pop() *workload.Info {
return nil
}

info := c.heap.Pop()
return info.(*workload.Info)
return c.heap.Pop()
}

func (c *clusterQueueBase) Dump() ([]string, bool) {
Expand All @@ -257,8 +259,7 @@ func (c *clusterQueueBase) Dump() ([]string, bool) {
return nil, false
}
elements := make([]string, c.heap.Len())
for i, e := range c.heap.List() {
info := e.(*workload.Info)
for i, info := range c.heap.List() {
elements[i] = workload.Key(info.Obj)
}
return elements, true
Expand Down Expand Up @@ -288,22 +289,15 @@ func (c *clusterQueueBase) Snapshot() []*workload.Info {
func (c *clusterQueueBase) Info(key string) *workload.Info {
c.rwm.RLock()
defer c.rwm.RUnlock()
info := c.heap.GetByKey(key)
if info == nil {
return nil
}
return info.(*workload.Info)
return c.heap.GetByKey(key)
}

func (c *clusterQueueBase) totalElements() []*workload.Info {
c.rwm.RLock()
defer c.rwm.RUnlock()
totalLen := c.heap.Len() + len(c.inadmissibleWorkloads)
elements := make([]*workload.Info, 0, totalLen)
for _, e := range c.heap.List() {
info := e.(*workload.Info)
elements = append(elements, info)
}
elements = append(elements, c.heap.List()...)
for _, e := range c.inadmissibleWorkloads {
elements = append(elements, e)
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/queue/cluster_queue_impl_test.go
Expand Up @@ -87,7 +87,7 @@ func Test_PushOrUpdate(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)

if cq.Pending() != 0 {
t.Error("ClusterQueue should be empty")
Expand Down Expand Up @@ -116,7 +116,7 @@ func Test_PushOrUpdate(t *testing.T) {

func Test_Pop(t *testing.T) {
now := time.Now()
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(now))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(now))
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj())
if cq.Pop() != nil {
Expand All @@ -138,7 +138,7 @@ func Test_Pop(t *testing.T) {
}

func Test_Delete(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
cq.PushOrUpdate(workload.NewInfo(wl1))
Expand All @@ -159,19 +159,19 @@ func Test_Delete(t *testing.T) {
}

func Test_Info(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil {
t.Error("workload doesn't exist")
if info := cq.Info(workload.Key(wl)); info != nil {
t.Error("Workload should not exist")
}
cq.PushOrUpdate(workload.NewInfo(wl))
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info == nil {
t.Error("expected workload to exist")
if info := cq.Info(workload.Key(wl)); info == nil {
t.Error("Expected workload to exist")
}
}

func Test_AddFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
queue := &LocalQueue{
items: map[string]*workload.Info{
Expand All @@ -189,7 +189,7 @@ func Test_AddFromLocalQueue(t *testing.T) {
}

func Test_DeleteFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj()
qImpl := newLocalQueue(q)
wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj()
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestClusterQueueImpl(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)
err := cq.Update(utiltesting.MakeClusterQueue("cq").
NamespaceSelector(&metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestClusterQueueImpl(t *testing.T) {
}

func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq := newClusterQueueImpl(defaultQueueOrderingFunc, testingclock.NewFakeClock(time.Now()))
cq.namespaceSelector = labels.Everything()
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
cl := utiltesting.NewFakeClient(
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestBackoffWaitingTimeExpired(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, defaultQueueOrderingFunc, fakeClock)
cq := newClusterQueueImpl(defaultQueueOrderingFunc, fakeClock)
got := cq.backoffWaitingTimeExpired(tc.workloadInfo)
if tc.want != got {
t.Errorf("Unexpected result from backoffWaitingTimeExpired\nwant: %v\ngot: %v\n", tc.want, got)
Expand Down
16 changes: 7 additions & 9 deletions pkg/queue/cluster_queue_strict_fifo.go
Expand Up @@ -37,7 +37,7 @@ type ClusterQueueStrictFIFO struct {
var _ ClusterQueue = &ClusterQueueStrictFIFO{}

func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, queueOrderingFunc(wo), realClock)
cqImpl := newClusterQueueImpl(queueOrderingFunc(wo), realClock)
cqStrict := &ClusterQueueStrictFIFO{
clusterQueueBase: cqImpl,
}
Expand All @@ -50,19 +50,17 @@ func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue, wo workload.Ordering) (Cl
// to sort workloads. The function sorts workloads based on their priority.
// When priorities are equal, it uses the workload's creation or eviction
// time.
func queueOrderingFunc(wo workload.Ordering) func(a, b interface{}) bool {
return func(a, b interface{}) bool {
objA := a.(*workload.Info)
objB := b.(*workload.Info)
p1 := utilpriority.Priority(objA.Obj)
p2 := utilpriority.Priority(objB.Obj)
func queueOrderingFunc(wo workload.Ordering) func(a, b *workload.Info) bool {
return func(a, b *workload.Info) bool {
p1 := utilpriority.Priority(a.Obj)
p2 := utilpriority.Priority(b.Obj)

if p1 != p2 {
return p1 > p2
}

tA := wo.GetQueueOrderTimestamp(objA.Obj)
tB := wo.GetQueueOrderTimestamp(objB.Obj)
tA := wo.GetQueueOrderTimestamp(a.Obj)
tB := wo.GetQueueOrderTimestamp(b.Obj)
return !tB.Before(tA)
}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/queue/local_queue.go
Expand Up @@ -23,11 +23,6 @@ import (
"sigs.k8s.io/kueue/pkg/workload"
)

func keyFunc(obj interface{}) string {
i := obj.(*workload.Info)
return workload.Key(i.Obj)
}

// Key is the key used to index the queue.
func Key(q *kueue.LocalQueue) string {
return fmt.Sprintf("%s/%s", q.Namespace, q.Name)
Expand Down