diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index 3f69b102bf..7101ec82a3 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -437,38 +437,42 @@ func (r *Resource) Equal(rr *Resource, defaultValue DimensionDefaultValue) bool } // Diff calculate the difference between two resource object -func (r *Resource) Diff(rr *Resource) (*Resource, *Resource) { +// Note: if `defaultValue` equals `Infinity`, the difference between two values will be `Infinity`, marked as -1 +func (r *Resource) Diff(rr *Resource, defaultValue DimensionDefaultValue) (*Resource, *Resource) { + leftRes := r.Clone() + rightRes := rr.Clone() increasedVal := EmptyResource() decreasedVal := EmptyResource() - if r.MilliCPU > rr.MilliCPU { - increasedVal.MilliCPU += r.MilliCPU - rr.MilliCPU + r.setDefaultValue(leftRes, rightRes, defaultValue) + + if leftRes.MilliCPU > rightRes.MilliCPU { + increasedVal.MilliCPU = leftRes.MilliCPU - rightRes.MilliCPU } else { - decreasedVal.MilliCPU += rr.MilliCPU - r.MilliCPU + decreasedVal.MilliCPU = rightRes.MilliCPU - leftRes.MilliCPU } - if r.Memory > rr.Memory { - increasedVal.Memory += r.Memory - rr.Memory + if leftRes.Memory > rightRes.Memory { + increasedVal.Memory = leftRes.Memory - rightRes.Memory } else { - decreasedVal.Memory += rr.Memory - r.Memory + decreasedVal.Memory = rightRes.Memory - leftRes.Memory } - for rName, rQuant := range r.ScalarResources { - rrQuant, ok := rr.ScalarResources[rName] - - if !ok { - rrQuant = 0 + increasedVal.ScalarResources = make(map[v1.ResourceName]float64, 0) + decreasedVal.ScalarResources = make(map[v1.ResourceName]float64, 0) + for lName, lQuant := range leftRes.ScalarResources { + rQuant, _ := rightRes.ScalarResources[lName] + if lQuant == -1 { + increasedVal.ScalarResources[lName] = -1 + continue } - - if rQuant > rrQuant { - if increasedVal.ScalarResources == nil { - increasedVal.ScalarResources = map[v1.ResourceName]float64{} - } - increasedVal.ScalarResources[rName] += rQuant - rrQuant + if rQuant == -1 { + decreasedVal.ScalarResources[lName] = -1 + continue + } + if lQuant > rQuant { + increasedVal.ScalarResources[lName] = lQuant - rQuant } else { - if decreasedVal.ScalarResources == nil { - decreasedVal.ScalarResources = map[v1.ResourceName]float64{} - } - decreasedVal.ScalarResources[rName] += rrQuant - rQuant + decreasedVal.ScalarResources[lName] = rQuant - lQuant } } diff --git a/pkg/scheduler/api/resource_info_test.go b/pkg/scheduler/api/resource_info_test.go index bc17d20ed6..4b3fb1402d 100644 --- a/pkg/scheduler/api/resource_info_test.go +++ b/pkg/scheduler/api/resource_info_test.go @@ -289,6 +289,179 @@ func TestSubResource(t *testing.T) { } } +func TestDiff(t *testing.T) { + testsForDefaultZero := []struct { + resource1 *Resource + resource2 *Resource + expectedIncreased *Resource + expectedDecreased *Resource + }{ + { + resource1: &Resource{}, + resource2: &Resource{}, + expectedIncreased: &Resource{ + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + expectedDecreased: &Resource{ + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + { + resource1: &Resource{ + MilliCPU: 1000, + Memory: 2000, + }, + resource2: &Resource{}, + expectedIncreased: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + expectedDecreased: &Resource{ + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + { + resource1: &Resource{}, + resource2: &Resource{ + MilliCPU: 1000, + Memory: 2000, + }, + expectedIncreased: &Resource{ + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + expectedDecreased: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + { + resource1: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + resource2: &Resource{ + MilliCPU: 2000, + Memory: 1000, + }, + expectedIncreased: &Resource{ + Memory: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + expectedDecreased: &Resource{ + MilliCPU: 1000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + { + resource1: &Resource{ + MilliCPU: 2000, + Memory: 1000, + }, + resource2: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + expectedIncreased: &Resource{ + MilliCPU: 1000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + expectedDecreased: &Resource{ + Memory: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + }, + { + resource1: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 3000}, + }, + resource2: &Resource{ + MilliCPU: 2000, + Memory: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + expectedIncreased: &Resource{ + Memory: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 2000}, + }, + expectedDecreased: &Resource{ + MilliCPU: 1000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + } + + testsForDefaultInfinity := []struct { + resource1 *Resource + resource2 *Resource + expectedIncreased *Resource + expectedDecreased *Resource + }{ + { + resource1: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + resource2: &Resource{ + MilliCPU: 2000, + Memory: 1000, + }, + expectedIncreased: &Resource{ + Memory: 1000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + expectedDecreased: &Resource{ + MilliCPU: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": -1}, + }, + }, + { + resource1: &Resource{ + MilliCPU: 2000, + Memory: 1000, + }, + resource2: &Resource{ + MilliCPU: 1000, + Memory: 2000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000}, + }, + expectedIncreased: &Resource{ + MilliCPU: 1000, + ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": -1}, + }, + expectedDecreased: &Resource{ + Memory: 1000, + ScalarResources: make(map[v1.ResourceName]float64, 0), + }, + }, + } + + for _, test := range testsForDefaultZero { + increased, decreased := test.resource1.Diff(test.resource2, Zero) + if !reflect.DeepEqual(test.expectedIncreased, increased) { + t.Errorf("expected: %#v, got: %#v", test.expectedIncreased, increased) + } + if !reflect.DeepEqual(test.expectedDecreased, decreased) { + t.Errorf("expected: %#v, got: %#v", test.expectedDecreased, decreased) + } + } + for _, test := range testsForDefaultInfinity { + increased, decreased := test.resource1.Diff(test.resource2, Infinity) + if !reflect.DeepEqual(test.expectedIncreased, increased) { + t.Errorf("expected: %#v, got: %#v", test.expectedIncreased, increased) + } + if !reflect.DeepEqual(test.expectedDecreased, decreased) { + t.Errorf("expected: %#v, got: %#v", test.expectedDecreased, decreased) + } + } +} + func TestLess(t *testing.T) { testsForDefaultZero := []struct { resource1 *Resource diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 17b988acdb..984bb0900d 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -178,7 +178,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("The attributes of queue <%s> in proportion: deserved <%v>, allocate <%v>, request <%v>, share <%0.2f>", attr.name, attr.deserved, attr.allocated, attr.request, attr.share) - increased, decreased := attr.deserved.Diff(oldDeserved) + increased, decreased := attr.deserved.Diff(oldDeserved, api.Zero) increasedDeserved.Add(increased) decreasedDeserved.Add(decreased) @@ -254,7 +254,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { underUsedResNames := api.ResourceNameList{} attr := pp.queueOpts[queue.UID] - _, underUsedResource := attr.allocated.Diff(attr.deserved) + _, underUsedResource := attr.allocated.Diff(attr.deserved, api.Zero) if underUsedResource.MilliCPU >= api.GetMinResource() { underUsedResNames = append(underUsedResNames, v1.ResourceCPU) } @@ -268,7 +268,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { queue.Name, attr.deserved, attr.allocated, attr.share, underUsedResNames) return underUsedResNames - }) ssn.AddJobEnqueueableFn(pp.Name(), func(obj interface{}) int {