Skip to content

Commit

Permalink
scheduler: RescheduleTracker dropped if follow-up fail placements
Browse files Browse the repository at this point in the history
When an allocation fails it triggers an evaluation. The evaluation is
processed and the scheduler sees it needs to reschedule, which
triggers a follow-up eval. The follow-up eval creates a plan to
`(stop 1) (place 1)`. The replacement alloc has a `RescheduleTracker`
(or gets its `RescheduleTracker` updated).

But in the case where the follow-up eval can't place all allocs (there
aren't enough resources), it can create a partial plan to
`(stop 1) (place 0)`. It then creates a blocked eval. The plan applier
stops the failed alloc. Then when the blocked eval is processed, the
job is missing an allocation, so the scheduler creates a new
allocation. This allocation is _not_ a replacement from the
perspective of the scheduler, so it's not handed off a
`RescheduleTracker`.
  • Loading branch information
tgross committed May 15, 2024
1 parent 6886edf commit 0533b7c
Showing 1 changed file with 180 additions and 0 deletions.
180 changes: 180 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4654,6 +4654,186 @@ func TestServiceSched_Reschedule_MultipleNow(t *testing.T) {
assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts
}

func TestServiceSched_BlockedReschedule(t *testing.T) {
ci.Parallel(t)

h := NewHarness(t)
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Generate a fake job with an allocation and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 1
delayDuration := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 3,
Interval: 15 * time.Minute,
Delay: delayDuration,
MaxDelay: 1 * time.Minute,
DelayFunction: "constant",
}
tgName := job.TaskGroups[0].Name
now := time.Now()

require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

Check failure on line 4678 in scheduler/generic_sched_test.go

View workflow job for this annotation

GitHub Actions / tests-groups (quick)

not enough arguments in call to h.State.UpsertJob

Check failure on line 4678 in scheduler/generic_sched_test.go

View workflow job for this annotation

GitHub Actions / tests-groups (quick)

not enough arguments in call to h.State.UpsertJob

alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
failedAllocID := alloc.ID

require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Allocation{alloc}))

// Create a mock evaluation for the allocation failure
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

// -----------------------------------
// first reschedule which works as expected

// Process the evaluation and assert we have a plan
require.NoError(t, h.Process(NewServiceScheduler, eval))
require.Len(t, h.Plans, 1)
require.Len(t, h.Plans[0].NodeUpdate, 0) // stop
require.Len(t, h.Plans[0].NodeAllocation, 1) // place

// Lookup the allocations by JobID and verify no new allocs created
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, 1)

// Verify follow-up eval was created for the failed alloc
// and write the eval to the state store
alloc, err = h.State.AllocByID(ws, failedAllocID)
require.NoError(t, err)
require.NotEmpty(t, alloc.FollowupEvalID)
require.Len(t, h.CreateEvals, 1)
followupEval := h.CreateEvals[0]
require.Equal(t, structs.EvalStatusPending, followupEval.Status)
require.Equal(t, now.Add(delayDuration), followupEval.WaitUntil)
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{followupEval}))

// Process the follow-up eval, which results in a replacement and stop
require.NoError(t, h.Process(NewServiceScheduler, followupEval))
require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeUpdate, 1) // stop
require.Len(t, h.Plans[1].NodeAllocation, 1) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, 2)

var replacementAllocID string
for _, alloc := range out {
if alloc.ID != failedAllocID {
require.NotNil(t, alloc.RescheduleTracker,
"replacement alloc should have reschedule tracker")
require.Len(t, alloc.RescheduleTracker.Events, 1)
replacementAllocID = alloc.ID
break
}
}

// -----------------------------------
// second reschedule but it blocks

alloc, err = h.State.AllocByID(ws, replacementAllocID)
require.NoError(t, err)
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Allocation{alloc}))

// Create a mock evaluation for the allocation failure
eval.ID = uuid.Generate()
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation and assert we have a plan
require.NoError(t, h.Process(NewServiceScheduler, eval))
require.Len(t, h.Plans, 3)
require.Len(t, h.Plans[2].NodeUpdate, 0) // stop
require.Len(t, h.Plans[2].NodeAllocation, 1) // place

// Lookup the allocations by JobID and verify no new allocs created
out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, 2)

// Verify follow-up eval was created for the failed alloc
// and write the eval to the state store
alloc, err = h.State.AllocByID(ws, replacementAllocID)
require.NoError(t, err)
require.NotEmpty(t, alloc.FollowupEvalID)
require.Len(t, h.CreateEvals, 2)
followupEval = h.CreateEvals[1]
require.Equal(t, structs.EvalStatusPending, followupEval.Status)
require.Equal(t, now.Add(delayDuration), followupEval.WaitUntil)
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{followupEval}))

// "use up" resources on the node so the follow-up will block
node.NodeResources.Memory.MemoryMB = 200
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Process the follow-up eval, which results in a stop but not a replacement
require.NoError(t, h.Process(NewServiceScheduler, followupEval))
require.Len(t, h.Plans, 4)
require.Len(t, h.Plans[3].NodeUpdate, 1) // stop
require.Len(t, h.Plans[3].NodeAllocation, 0) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, 2)

// Verify blocked eval was created and write it to state
require.Len(t, h.CreateEvals, 3)
blockedEval := h.CreateEvals[1]
require.Equal(t, structs.EvalTriggerRetryFailedAlloc, blockedEval.TriggeredBy)
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{blockedEval, followupEval}))

// "free up" resources on the node so the blocked eval will succeed
node.NodeResources.Memory.MemoryMB = 8000
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

require.NoError(t, h.Process(NewServiceScheduler, blockedEval))
require.Len(t, h.Plans, 5)
require.Len(t, h.Plans[4].NodeUpdate, 0) // stop
require.Len(t, h.Plans[4].NodeAllocation, 1) // place

out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, 3)

for _, alloc := range out {
if alloc.ID != failedAllocID && alloc.ID != replacementAllocID {
require.NotNil(t, alloc.RescheduleTracker,
"replacement alloc should have reschedule tracker")
require.Len(t, alloc.RescheduleTracker.Events, 1)
}
}
}

// Tests that old reschedule attempts are pruned
func TestServiceSched_Reschedule_PruneEvents(t *testing.T) {
ci.Parallel(t)
Expand Down

0 comments on commit 0533b7c

Please sign in to comment.