Skip to content

Commit

Permalink
[DEBUG]
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross committed May 15, 2024
1 parent b8127cb commit 5b87fd8
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
21 changes: 18 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,10 +524,19 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Capture current time to use as the start time for any rescheduled allocations
now := time.Now()

// DEBUG
// allAllocsForVersion, err := s.state.AllocsByJob(nil, s.job.Namespace, s.job.ID, false)
// if err != nil {
// return err
// }

// END DEBUG

// Have to handle destructive changes first as we need to discount their
// resources. To understand this imagine the resources were reduced and the
// count was scaled up.
for _, results := range [][]placementResult{destructive, place} {

for _, missing := range results {
// Get the task group
tg := missing.TaskGroup()
Expand Down Expand Up @@ -592,6 +601,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// frees the resources currently used by the previous allocation.
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
fmt.Printf("[*] trying to place for prevAllocation: %v\n", prevAllocation)
if stopPrevAlloc {
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "", "")
}
Expand Down Expand Up @@ -623,6 +633,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// Set fields based on if we found an allocation option
if option != nil {
fmt.Printf("[*] found option\n")
resources := &structs.AllocatedResources{
Tasks: option.TaskResources,
TaskLifecycles: option.TaskLifecycles,
Expand Down Expand Up @@ -708,6 +719,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
s.plan.AppendAlloc(alloc, downgradedJob)

} else {
fmt.Printf("[*] found no option\n")

// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
Expand All @@ -721,9 +734,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul

// If we weren't able to find a replacement for the allocation, back
// out the fact that we asked to stop the allocation.
if stopPrevAlloc {
s.plan.PopUpdate(prevAllocation)
}
//if stopPrevAlloc {

// TODO: should we *unconditionally* pop this?
s.plan.PopUpdate(prevAllocation)
//}
}

}
Expand Down
17 changes: 14 additions & 3 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4704,7 +4704,9 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
h.NextIndex(), []*structs.Evaluation{eval}))

// -----------------------------------
// first reschedule which works as expected
// first reschedule which works with delay as expected

fmt.Println("[0] -------------------------------")

// Process the evaluation and assert we have a plan
must.NoError(t, h.Process(NewServiceScheduler, eval))
Expand All @@ -4730,7 +4732,10 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{followupEval}))

// Process the follow-up eval, which results in a replacement and stop
fmt.Println("[1] -------------------------------")

// Follow-up delay "expires", so process the follow-up eval, which results
// in a replacement and stop
must.NoError(t, h.Process(NewServiceScheduler, followupEval))
must.Len(t, 2, h.Plans)
must.MapLen(t, 1, h.Plans[1].NodeUpdate) // stop
Expand All @@ -4752,7 +4757,7 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
}

// -----------------------------------
// second reschedule but it blocks
// Replacement alloc fails, second reschedule but it blocks because of delay

alloc, err = h.State.AllocByID(ws, replacementAllocID)
must.NoError(t, err)
Expand All @@ -4768,6 +4773,8 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

fmt.Println("[2] -------------------------------")

// Process the evaluation and assert we have a plan
must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 3, h.Plans)
Expand Down Expand Up @@ -4795,6 +4802,8 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
node.NodeResources.Memory.MemoryMB = 200
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

fmt.Println("[3] -------------------------------")

// Process the follow-up eval, which results in a stop but not a replacement
must.NoError(t, h.Process(NewServiceScheduler, followupEval))
must.Len(t, 4, h.Plans)
Expand All @@ -4816,6 +4825,8 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
node.NodeResources.Memory.MemoryMB = 8000
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

fmt.Println("-------------------------------")

must.NoError(t, h.Process(NewServiceScheduler, blockedEval))
must.Len(t, 5, h.Plans)
must.MapLen(t, 0, h.Plans[4].NodeUpdate) // stop
Expand Down
1 change: 1 addition & 0 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {

// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
fmt.Printf("[*] untainted: %d rescheduleNow: %d rescheduleLater: %d\n", len(untainted), len(rescheduleNow), len(rescheduleLater))

// If there are allocations reconnecting we need to reconcile them and
// their replacements first because there is specific logic when deciding
Expand Down
8 changes: 8 additions & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
rescheduleLater := []*delayedRescheduleInfo{}

for _, alloc := range a {
fmt.Printf("[*] checking alloc %q (%s/%s)\n", alloc.ID, alloc.DesiredStatus, alloc.ClientStatus)

// Ignore disconnecting allocs that are already unknown. This can happen
// in the case of canaries that are interrupted by a disconnect.
if isDisconnecting && alloc.ClientStatus == structs.AllocClientStatusUnknown {
Expand All @@ -408,21 +410,26 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time
// Only failed or disconnecting allocs should be rescheduled.
// Protects against a bug allowing rescheduling running allocs.
if alloc.NextAllocation != "" && alloc.TerminalStatus() {
// DEBUG
fmt.Printf("[*] ignoring terminal alloc %q w/ NextAllocation %q\n", alloc.ID, alloc.NextAllocation)
continue
}

isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted && !isDisconnecting {
fmt.Printf("[*] isUntainted alloc %q\n", alloc.ID)
untainted[alloc.ID] = alloc
}

if ignore {
fmt.Printf("[*] ignoring alloc %q w/ PreviousAllocation %q\n", alloc.ID, alloc.PreviousAllocation)
continue
}

eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting)
if eligibleNow {
rescheduleNow[alloc.ID] = alloc
fmt.Printf("[*] rescheduleNow alloc %q\n", alloc.ID)
continue
}

Expand All @@ -432,6 +439,7 @@ func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time

if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime})
fmt.Printf("[*] rescheduleLater alloc %q\n", alloc.ID)
}

}
Expand Down

0 comments on commit 5b87fd8

Please sign in to comment.