Skip to content

Commit

Permalink
Job: Mark jobs as complete in the scaled down scenario
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Mar 2, 2024
1 parent 4364e6e commit 15bd979
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
60 changes: 37 additions & 23 deletions pkg/controller/job/job_controller.go
Expand Up @@ -852,11 +852,17 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// then the job is complete.
complete = jobCtx.succeeded > 0 && active == 0
} else {
// If job is scaled down and the number of succeeded pods already reached completions,
// job should be marked as complete here.
wantActive := active
if feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) && satisfiedExpectations && job.DeletionTimestamp == nil {
wantActive = wantActivePods(&job, jobCtx)
}
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
complete = jobCtx.succeeded >= *job.Spec.Completions && wantActive == 0
}
if !complete {
if jm.pastActiveDeadline(&job) {
Expand Down Expand Up @@ -1443,7 +1449,6 @@ func jobSuspended(job *batch.Job) bool {
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
logger := klog.FromContext(ctx)
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
Expand All @@ -1470,27 +1475,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
terminating = *jobCtx.terminating
}
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
wantActive := wantActivePods(job, jobCtx)

rmAtLeast := active - wantActive
if rmAtLeast < 0 {
Expand Down Expand Up @@ -1633,6 +1618,35 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionTracking, nil
}

// wantActivePods returns a desired number of active pods.
func wantActivePods(job *batch.Job, jobCtx *syncJobCtx) int32 {
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
wantActive := int32(0)

if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
return wantActive
}

// getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes
// to create that contains those which can be already created. In case no indexes
// are ready to create pods, it returns the lowest remaining time to create pods
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -2059,6 +2059,18 @@ func TestSyncJobPastDeadline(t *testing.T) {
expectedSucceeded: 2,
expectedCondition: batch.JobComplete,
},
"elasticIndexed job is scaled down and exceeded activeDeadlineSeconds; the number of succeeded pods already reached the completions": {
parallelism: 1,
completions: 1,
activeDeadlineSeconds: 10,
startTime: 15,
succeededPods: 1,
activePods: 2,
expectedFailed: 2,
expectedSucceeded: 1,
expectedDeletions: 2,
expectedCondition: batch.JobComplete,
},
}

for name, tc := range testCases {
Expand Down

0 comments on commit 15bd979

Please sign in to comment.