Skip to content

Commit

Permalink
support for the managed-by label in Job
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Feb 15, 2024
1 parent 0a162a9 commit 6d86701
Show file tree
Hide file tree
Showing 11 changed files with 1,120 additions and 7 deletions.
7 changes: 7 additions & 0 deletions pkg/apis/batch/types.go
Expand Up @@ -44,13 +44,20 @@ const (
JobNameLabel = labelPrefix + LegacyJobNameLabel
// Controller UID is used for selectors and labels for jobs
ControllerUidLabel = labelPrefix + LegacyControllerUidLabel
// Label indicating the controller that manages a Job. When the label is
// absent on a job object, or its value equals "job-controller.k8s.io" then
// the Job is reconciled by the built-in Job controller.
JobManagedByLabel = labelPrefix + "managed-by"
// Annotation indicating the number of failures for the index corresponding
// to the pod, which are counted towards the backoff limit.
JobIndexFailureCountAnnotation = labelPrefix + "job-index-failure-count"
// Annotation indicating the number of failures for the index corresponding
// to the pod, which don't count towards the backoff limit, according to the
// pod failure policy. When the annotation is absent zero is implied.
JobIndexIgnoredFailureCountAnnotation = labelPrefix + "job-index-ignored-failure-count"
// JobControllerName reserved value for the managed-by label for the built-in
// Job controller.
JobControllerName = "job-controller.k8s.io"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
119 changes: 113 additions & 6 deletions pkg/apis/batch/validation/validation.go
Expand Up @@ -19,6 +19,7 @@ package validation
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -31,10 +32,13 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core"
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

// maxParallelismForIndexJob is the maximum parallelism that an Indexed Job
Expand Down Expand Up @@ -389,8 +393,9 @@ func validatePodFailurePolicyRuleOnExitCodes(onExitCode *batch.PodFailurePolicyO
}

// validateJobStatus validates a JobStatus and returns an ErrorList with any errors.
func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.ErrorList {
func validateJobStatus(job *batch.Job, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
status := job.Status
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Active), fldPath.Child("active"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Succeeded), fldPath.Child("succeeded"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Failed), fldPath.Child("failed"))...)
Expand All @@ -400,6 +405,27 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
if status.Terminating != nil {
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*status.Terminating), fldPath.Child("terminating"))...)
}

if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) {
isComplete := isConditionTrue(status.Conditions, batch.JobComplete)
if isComplete && isConditionTrue(status.Conditions, batch.JobFailed) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "both Complete=True and Failed=True conditions are set"))
}
if status.CompletionTime != nil && !isComplete {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "completeTime can only be set when Complete=True"))
}
if ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) == batch.NonIndexedCompletion {
if status.CompletedIndexes != "" {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, "non-empty completedIndexes when non-indexed completion mode"))
}
}
if job.Spec.BackoffLimitPerIndex == nil {
if status.FailedIndexes != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), *status.FailedIndexes, "non-nil failedIndexes when backoffLimitPerIndex=nil"))
}
}
}

if status.UncountedTerminatedPods != nil {
path := fldPath.Child("uncountedTerminatedPods")
seen := sets.NewString()
Expand Down Expand Up @@ -429,15 +455,23 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error

// ValidateJobUpdate validates an update to a Job and returns an ErrorList with any errors.
func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobSpecUpdate(job.Spec, oldJob.Spec, field.NewPath("spec"), opts)...)
return allErrs
}

// ValidateJobUpdateStatus validates an update to the status of a Job and returns an ErrorList with any errors.
func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobStatusUpdate(job.Status, oldJob.Status)...)
allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobStatusUpdate(job, oldJob)...)
return allErrs
}

func validateJobMetaUpdate(newMeta, oldMeta *metav1.ObjectMeta, fldPath *field.Path) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(newMeta, oldMeta, fldPath)
if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) {
allErrs = append(allErrs, apivalidation.ValidateImmutableLabel(newMeta.Labels[batch.JobManagedByLabel], oldMeta.Labels[batch.JobManagedByLabel], batch.JobManagedByLabel, fldPath)...)
}
return allErrs
}

Expand Down Expand Up @@ -485,9 +519,28 @@ func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path,
}

// ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors.
func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList {
func ValidateJobStatusUpdate(job, oldJob *batch.Job) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateJobStatus(&status, field.NewPath("status"))...)
statusFld := field.NewPath("status")
allErrs = append(allErrs, validateJobStatus(job, statusFld)...)

if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) {
for _, cType := range []batch.JobConditionType{batch.JobFailed, batch.JobComplete, batch.JobFailureTarget} {
if isConditionTrue(oldJob.Status.Conditions, cType) && !isConditionTrue(job.Status.Conditions, cType) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, fmt.Sprintf("disablement of %s=True condition", string(cType))))
}
}
if job.Status.CompletedIndexes != oldJob.Status.CompletedIndexes {
if err := validateIndexesFormat(job.Status.CompletedIndexes, int(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(statusFld.Child("completedIndexes"), job.Status.CompletedIndexes, fmt.Sprintf("parsing error: %s", err.Error())))
}
}
if job.Status.FailedIndexes != oldJob.Status.FailedIndexes && job.Status.FailedIndexes != nil {
if err := validateIndexesFormat(*job.Status.FailedIndexes, int(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(statusFld.Child("failedIndexes"), job.Status.FailedIndexes, fmt.Sprintf("parsing error: %s", err.Error())))
}
}
}
return allErrs
}

Expand Down Expand Up @@ -665,6 +718,60 @@ func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts
return allErrs
}

func isConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bool {
if condition := findConditionByType(list, cType); condition != nil {
return condition.Status == api.ConditionTrue
}
return false
}

func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition {
for i := range list {
if list[i].Type == cType {
return &list[i]
}
}
return nil
}

func validateIndexesFormat(indexesStr string, completions int) error {
if indexesStr == "" {
return nil
}
var lastIndex *int
for _, intervalStr := range strings.Split(indexesStr, ",") {
limitsStr := strings.Split(intervalStr, "-")
if len(limitsStr) > 2 {
return fmt.Errorf("too many parts separated by '-' in %q", intervalStr)
}
x, err := strconv.Atoi(limitsStr[0])
if err != nil {
return fmt.Errorf("cannot parse integer index: %q", limitsStr[0])
}
if x >= completions {
return fmt.Errorf("too large index: %q", limitsStr[0])
}
if lastIndex != nil && *lastIndex >= x {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x)
}
lastIndex = ptr.To[int](x)
if len(limitsStr) > 1 {
y, err := strconv.Atoi(limitsStr[1])
if err != nil {
return fmt.Errorf("cannot parse integer index: %q", limitsStr[1])
}
if y >= completions {
return fmt.Errorf("too large index: %q", limitsStr[1])
}
if *lastIndex >= y {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, y)
}
lastIndex = ptr.To[int](y)
}
}
return nil
}

type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow mutable node affinity, selector and tolerations of the template
Expand Down

0 comments on commit 6d86701

Please sign in to comment.