Skip to content

Commit

Permalink
support for the managed-by label in Job
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Mar 1, 2024
1 parent 235d4ff commit 2517568
Show file tree
Hide file tree
Showing 13 changed files with 1,726 additions and 17 deletions.
8 changes: 8 additions & 0 deletions pkg/apis/batch/types.go
Expand Up @@ -44,13 +44,21 @@ const (
JobNameLabel = labelPrefix + LegacyJobNameLabel
// Controller UID is used for selectors and labels for jobs
ControllerUidLabel = labelPrefix + LegacyControllerUidLabel
// Label indicating the controller that manages a Job. The k8s Job controller
// reconciles jobs which don't have this label at all or the label value is
// the reserved string `job-controller.k8s.io`, but skips reconciling Jobs
// with a custom value for this label.
JobManagedByLabel = labelPrefix + "managed-by"
// Annotation indicating the number of failures for the index corresponding
// to the pod, which are counted towards the backoff limit.
JobIndexFailureCountAnnotation = labelPrefix + "job-index-failure-count"
// Annotation indicating the number of failures for the index corresponding
// to the pod, which don't count towards the backoff limit, according to the
// pod failure policy. When the annotation is absent zero is implied.
JobIndexIgnoredFailureCountAnnotation = labelPrefix + "job-index-ignored-failure-count"
// JobControllerName reserved value for the managed-by label for the built-in
// Job controller.
JobControllerName = "job-controller.k8s.io"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
215 changes: 208 additions & 7 deletions pkg/apis/batch/validation/validation.go
Expand Up @@ -19,6 +19,7 @@ package validation
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -36,6 +37,7 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
apivalidation "k8s.io/kubernetes/pkg/apis/core/validation"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

// maxParallelismForIndexJob is the maximum parallelism that an Indexed Job
Expand Down Expand Up @@ -390,8 +392,9 @@ func validatePodFailurePolicyRuleOnExitCodes(onExitCode *batch.PodFailurePolicyO
}

// validateJobStatus validates a JobStatus and returns an ErrorList with any errors.
func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.ErrorList {
func validateJobStatus(job *batch.Job, fldPath *field.Path, opts JobStatusValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
status := job.Status
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Active), fldPath.Child("active"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Succeeded), fldPath.Child("succeeded"))...)
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Failed), fldPath.Child("failed"))...)
Expand Down Expand Up @@ -425,20 +428,103 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
}
}
}
if opts.RejectCompleteJobWithFailedCondition {
if IsConditionTrue(status.Conditions, batch.JobComplete) && IsConditionTrue(status.Conditions, batch.JobFailed) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True and Failed=true conditions"))
}
}
if opts.RejectCompleteJobWithFailureTargetCondition {
if IsConditionTrue(status.Conditions, batch.JobComplete) && IsConditionTrue(status.Conditions, batch.JobFailureTarget) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "cannot set Complete=True and FailureTarget=true conditions"))
}
}
if opts.RejectNotCompleteJobWithCompletionTime {
if status.CompletionTime != nil && !IsJobComplete(job) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "cannot set completionTime when there is no Complete=True condition"))
}
}
if opts.RejectCompleteJobWithoutCompletionTime {
if status.CompletionTime == nil && IsJobComplete(job) {
allErrs = append(allErrs, field.Required(fldPath.Child("completionTime"), "completionTime is required for Complete jobs"))
}
}
if opts.RejectCompletionTimeBeforeStartTime {
if status.StartTime != nil && status.CompletionTime != nil && status.CompletionTime.Before(status.StartTime) {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "completionTime cannot be set before startTime"))
}
}
isJobFinished := IsJobFinished(job)
if opts.RejectFinishedJobWithActivePods {
if status.Active > 0 && isJobFinished {
allErrs = append(allErrs, field.Invalid(fldPath.Child("active"), status.Active, "active>0 is invalid for finished job"))
}
}
if opts.RejectFinishedJobWithTerminatingPods {
if status.Terminating != nil && *status.Terminating > 0 && isJobFinished {
allErrs = append(allErrs, field.Invalid(fldPath.Child("terminating"), status.Active, "active>0 is invalid for finished job"))
}
}
if opts.RejectFinishedJobWithoutStartTime {
if status.StartTime == nil && isJobFinished {
allErrs = append(allErrs, field.Required(fldPath.Child("startTime"), "startTime is required for finished job"))
}
}
if opts.RejectFinishedJobWithUncountedTerminatedPods {
if isJobFinished && status.UncountedTerminatedPods != nil && len(status.UncountedTerminatedPods.Failed)+len(status.UncountedTerminatedPods.Succeeded) > 0 {
allErrs = append(allErrs, field.Invalid(fldPath.Child("uncountedTerminatedPods"), status.UncountedTerminatedPods, "uncountedTerminatedPods needs to be empty for finished job"))
}
}
if opts.RejectInvalidCompletedIndexes {
if job.Spec.Completions != nil {
if err := validateIndexesFormat(status.CompletedIndexes, int(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, fmt.Sprintf("error parsing completedIndexes: %s", err.Error())))
}
}
}
if opts.RejectInvalidFailedIndexes {
if job.Spec.Completions != nil && job.Spec.BackoffLimitPerIndex != nil && status.FailedIndexes != nil {
if err := validateIndexesFormat(*status.FailedIndexes, int(*job.Spec.Completions)); err != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), status.FailedIndexes, fmt.Sprintf("error parsing failedIndexes: %s", err.Error())))
}
}
}
isIndexed := ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) == batch.IndexedCompletion
if opts.RejectCompletedIndexesForNonIndexedJob {
if len(status.CompletedIndexes) != 0 && !isIndexed {
allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, "cannot set non-empty completedIndexes when non-indexed completion mode"))
}
}
if opts.RejectFailedIndexesForNoBackoffLimitPerIndex {
if job.Spec.BackoffLimitPerIndex == nil && status.FailedIndexes != nil {
allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), *status.FailedIndexes, "cannot set non-null failedIndexes when backoffLimitPerIndex is null"))
}
}
if opts.RejectMoreReadyThanActivePods {
if status.Ready != nil && *status.Ready > status.Active {
allErrs = append(allErrs, field.Invalid(fldPath.Child("ready"), *status.Ready, "cannot set more ready pods than active"))
}
}
return allErrs
}

// ValidateJobUpdate validates an update to a Job and returns an ErrorList with any errors.
func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta)
allErrs = append(allErrs, ValidateJobSpecUpdate(job.Spec, oldJob.Spec, field.NewPath("spec"), opts)...)
return allErrs
}

// ValidateJobUpdateStatus validates an update to the status of a Job and returns an ErrorList with any errors.
func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobStatusUpdate(job.Status, oldJob.Status)...)
func ValidateJobUpdateStatus(job, oldJob *batch.Job, opts JobStatusValidationOptions) field.ErrorList {
allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta)
allErrs = append(allErrs, ValidateJobStatusUpdate(job, oldJob, opts)...)
return allErrs
}

func validateJobMetaUpdate(newMeta, oldMeta *metav1.ObjectMeta) field.ErrorList {
fldPath := field.NewPath("metadata")
allErrs := apivalidation.ValidateObjectMetaUpdate(newMeta, oldMeta, fldPath)
allErrs = append(allErrs, apivalidation.ValidateImmutableLabel(newMeta.Labels[batch.JobManagedByLabel], oldMeta.Labels[batch.JobManagedByLabel], batch.JobManagedByLabel, fldPath)...)
return allErrs
}

Expand Down Expand Up @@ -486,9 +572,38 @@ func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path,
}

// ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors.
func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList {
func ValidateJobStatusUpdate(job, oldJob *batch.Job, opts JobStatusValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateJobStatus(&status, field.NewPath("status"))...)
statusFld := field.NewPath("status")
allErrs = append(allErrs, validateJobStatus(job, statusFld, opts)...)

if opts.RejectDisablingTerminalCondition {
for _, cType := range []batch.JobConditionType{batch.JobFailed, batch.JobComplete, batch.JobFailureTarget} {
if IsConditionTrue(oldJob.Status.Conditions, cType) && !IsConditionTrue(job.Status.Conditions, cType) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, fmt.Sprintf("cannot disable the terminal %s=True condition", string(cType))))
}
}
}
if opts.RejectDecreasingFailedCounter {
if job.Status.Failed < oldJob.Status.Failed {
allErrs = append(allErrs, field.Invalid(statusFld.Child("failed"), job.Status.Failed, "cannot decrease the failed counter"))
}
}
if opts.RejectDecreasingSucceededCounter {
if job.Status.Succeeded < oldJob.Status.Succeeded {
allErrs = append(allErrs, field.Invalid(statusFld.Child("succeeded"), job.Status.Succeeded, "cannot decrease the succeeded counter"))
}
}
if opts.RejectMutatingCompletionTime {
if job.Status.CompletionTime != nil && oldJob.Status.CompletionTime != nil && !ptr.Equal(job.Status.CompletionTime, oldJob.Status.CompletionTime) {
allErrs = append(allErrs, field.Invalid(statusFld.Child("completionTime"), job.Status.CompletionTime, "completionTime cannot be mutated"))
}
}
if opts.RejectRemovingStartTimeForUnsuspendedJob {
if oldJob.Status.StartTime != nil && job.Status.StartTime == nil && !ptr.Deref(job.Spec.Suspend, false) {
allErrs = append(allErrs, field.Required(statusFld.Child("startTime"), "startTime cannot be removed for unsuspended job"))
}
}
return allErrs
}

Expand Down Expand Up @@ -666,6 +781,70 @@ func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts
return allErrs
}

func IsJobFinished(job *batch.Job) bool {
for _, c := range job.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == api.ConditionTrue {
return true
}
}
return false
}

func IsJobComplete(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobComplete)
}

func IsJobFailed(job *batch.Job) bool {
return IsConditionTrue(job.Status.Conditions, batch.JobFailed)
}

func IsConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bool {
for _, c := range list {
if c.Type == cType && c.Status == api.ConditionTrue {
return true
}
}
return false
}

func validateIndexesFormat(indexesStr string, completions int) error {
if indexesStr == "" {
return nil
}
var lastIndex *int
for _, intervalStr := range strings.Split(indexesStr, ",") {
limitsStr := strings.Split(intervalStr, "-")
if len(limitsStr) > 2 {
return fmt.Errorf("the fragment %q violates the requirement that an index interval can have at most two parts separated by '-'", intervalStr)
}
x, err := strconv.Atoi(limitsStr[0])
if err != nil {
return fmt.Errorf("cannot convert string to integer for index: %q", limitsStr[0])
}
if x >= completions {
return fmt.Errorf("too large index: %q", limitsStr[0])
}
if lastIndex != nil && *lastIndex >= x {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x)
}
lastIndex = &x
if len(limitsStr) > 1 {
y, err := strconv.Atoi(limitsStr[1])
if err != nil {
return fmt.Errorf("cannot convert string to integer for index: %q", limitsStr[1])
}
if y >= completions {
return fmt.Errorf("too large index: %q", limitsStr[1])
}
if *lastIndex >= y {
return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, y)
}
lastIndex = &y
}
}
return nil
}

type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow mutable node affinity, selector and tolerations of the template
Expand All @@ -675,3 +854,25 @@ type JobValidationOptions struct {
// Require Job to have the label on batch.kubernetes.io/job-name and batch.kubernetes.io/controller-uid
RequirePrefixedLabels bool
}

type JobStatusValidationOptions struct {
RejectDecreasingSucceededCounter bool
RejectDecreasingFailedCounter bool
RejectDisablingTerminalCondition bool
RejectInvalidCompletedIndexes bool
RejectInvalidFailedIndexes bool
RejectCompletedIndexesForNonIndexedJob bool
RejectFailedIndexesForNoBackoffLimitPerIndex bool
RejectMoreReadyThanActivePods bool
RejectFinishedJobWithActivePods bool
RejectFinishedJobWithTerminatingPods bool
RejectFinishedJobWithoutStartTime bool
RejectFinishedJobWithUncountedTerminatedPods bool
RejectRemovingStartTimeForUnsuspendedJob bool
RejectCompletionTimeBeforeStartTime bool
RejectMutatingCompletionTime bool
RejectCompleteJobWithoutCompletionTime bool
RejectNotCompleteJobWithCompletionTime bool
RejectCompleteJobWithFailedCondition bool
RejectCompleteJobWithFailureTargetCondition bool
}

0 comments on commit 2517568

Please sign in to comment.