Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Job: calculate the job completion before calculating the activeDeadlineSeconds #121863

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 65 additions & 44 deletions pkg/controller/job/job_controller.go
Expand Up @@ -855,12 +855,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonBackoffLimitExceeded, "Job has reached the specified backoff limit", jm.clock.Now())
} else if jm.pastActiveDeadline(&job) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now())
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}

Expand All @@ -884,6 +878,41 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
}
}
}

complete := false
if jobCtx.finishedCondition == nil {
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
complete = jobCtx.succeeded > 0 && active == 0
} else {
// If job is scaled down and the number of succeeded pods already reached completions,
// job should be marked as complete here.
wantActive := active
if feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob) && satisfiedExpectations && job.DeletionTimestamp == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this if? Maybe, if needed, it could be inside the wantActivePods function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should put this if here since wantActivePods is also called in L1478.
@mimowo WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimowo Friendly ping :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is motivated by the fact that this two code paths could result setting different values for wantActive. So for example if satisfiedExpectations == false, feature.DefaultFeatureGate.Enabled(features.ElasticIndexedJob)==true, and job.Deletion==nil you will have wantActive=active here, but wantActive = wantActivePods(&job, jobCtx) inside manageJob.

I'm wondering if we can / should avoid it. No specific failure scenario at the moment.

Copy link
Member Author

@tenzen-y tenzen-y Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. We should call inside wantActive() if we need this if. Thanks!

wantActive = wantActivePods(&job, jobCtx)
}
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = jobCtx.succeeded >= *job.Spec.Completions && wantActive == 0
}
if !complete {
if jm.pastActiveDeadline(&job) {
jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded, "Job was active longer than specified deadline", jm.clock.Now())
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
jm.queue.AddAfter(key, syncDuration)
}
}
}

suspendCondChanged := false
// Remove active pods if Job failed.
if jobCtx.finishedCondition != nil {
Expand All @@ -901,22 +930,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
manageJobCalled = true
}
complete := false
if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success.
// Each pod is capable of
// determining whether or not the entire Job is done. Subsequent pods are
// not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete.
complete = jobCtx.succeeded > 0 && active == 0
} else {
// Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached.
complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
}
if complete {
jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
} else if manageJobCalled {
Expand Down Expand Up @@ -1486,7 +1499,6 @@ func jobSuspended(job *batch.Job) bool {
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
logger := klog.FromContext(ctx)
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for job %#v: %v", job, err))
Expand All @@ -1513,27 +1525,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
terminating = *jobCtx.terminating
}
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
wantActive := wantActivePods(job, jobCtx)

rmAtLeast := active - wantActive
if rmAtLeast < 0 {
Expand Down Expand Up @@ -1676,6 +1668,35 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionTracking, nil
}

// wantActivePods returns a desired number of active pods.
func wantActivePods(job *batch.Job, jobCtx *syncJobCtx) int32 {
active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism
wantActive := int32(0)

if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if jobCtx.succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism {
wantActive = parallelism
}
if wantActive < 0 {
wantActive = 0
}
}
return wantActive
}

// getPodCreationInfoForIndependentIndexes returns a sub-list of all indexes
// to create that contains those which can be already created. In case no indexes
// are ready to create pods, it returns the lowest remaining time to create pods
Expand Down
29 changes: 29 additions & 0 deletions pkg/controller/job/job_controller_test.go
Expand Up @@ -2089,6 +2089,35 @@ func TestSyncJobPastDeadline(t *testing.T) {
expectedCondition: batch.JobSuspended,
expectedConditionReason: "JobSuspended",
},
"nonIndexed job succeeded and exceeded activeDeadlineSeconds": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For these test cases, do they reproduce the bug if this code is removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can reproduce issues like the following:

$ go test -run "TestSyncJobPastDeadline" ./pkg/controller/job/...
?       k8s.io/kubernetes/pkg/controller/job/config     [no test files]
?       k8s.io/kubernetes/pkg/controller/job/config/v1alpha1    [no test files]
?       k8s.io/kubernetes/pkg/controller/job/metrics    [no test files]
--- FAIL: TestSyncJobPastDeadline (0.00s)
    --- FAIL: TestSyncJobPastDeadline/nonIndexed_job_succeeded_and_exceeded_activeDeadlineSeconds (0.00s)
        job_controller_test.go:2094: Expected fail condition.  Got []v1.JobCondition{v1.JobCondition{Type:"Failed", Status:"True", LastProbeTime:time.Date(2023, time.November, 14, 13, 40, 12, 766800000, time.Local), LastTransitionTime:time.Date(2023, time.November, 14, 13, 40, 12, 766800000, time.Local), Reason:"DeadlineExceeded", Message:"Job was active longer than specified deadline"}}
    --- FAIL: TestSyncJobPastDeadline/indexed_job_succeeded_and_exceeded_activeDeadlineSeconds (0.00s)
        job_controller_test.go:2094: Expected fail condition.  Got []v1.JobCondition{v1.JobCondition{Type:"Failed", Status:"True", LastProbeTime:time.Date(2023, time.November, 14, 13, 40, 12, 767083000, time.Local), LastTransitionTime:time.Date(2023, time.November, 14, 13, 40, 12, 767083000, time.Local), Reason:"DeadlineExceeded", Message:"Job was active longer than specified deadline"}}
FAIL
FAIL    k8s.io/kubernetes/pkg/controller/job    0.817s
FAIL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

parallelism: 1,
activeDeadlineSeconds: 10,
startTime: 15,
succeededPods: 1,
expectedSucceeded: 1,
expectedCondition: batch.JobComplete,
},
"indexed job succeeded and exceeded activeDeadlineSeconds": {
parallelism: 2,
completions: 2,
activeDeadlineSeconds: 10,
startTime: 15,
succeededPods: 2,
expectedSucceeded: 2,
expectedCondition: batch.JobComplete,
},
"elasticIndexed job is scaled down and exceeded activeDeadlineSeconds; the number of succeeded pods already reached the completions": {
parallelism: 1,
completions: 1,
activeDeadlineSeconds: 10,
startTime: 15,
succeededPods: 1,
activePods: 2,
expectedFailed: 2,
expectedSucceeded: 1,
expectedDeletions: 2,
expectedCondition: batch.JobComplete,
},
}

for name, tc := range testCases {
Expand Down