From 6d867014eddf1a4c0cd8ce2676abd829a9112ed0 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 13 Feb 2024 17:46:52 +0100 Subject: [PATCH] support for the managed-by label in Job --- pkg/apis/batch/types.go | 7 + pkg/apis/batch/validation/validation.go | 119 +++++- pkg/apis/batch/validation/validation_test.go | 413 +++++++++++++++++++ pkg/apis/core/validation/validation.go | 8 + pkg/controller/job/job_controller.go | 44 +- pkg/controller/job/job_controller_test.go | 144 +++++++ pkg/controller/job/metrics/metrics.go | 15 + pkg/features/kube_features.go | 9 + staging/src/k8s.io/api/batch/v1/types.go | 7 + test/e2e/apps/job.go | 103 +++++ test/integration/job/job_test.go | 258 ++++++++++++ 11 files changed, 1120 insertions(+), 7 deletions(-) diff --git a/pkg/apis/batch/types.go b/pkg/apis/batch/types.go index 8b8cadaaf4255..1f8010e24aae8 100644 --- a/pkg/apis/batch/types.go +++ b/pkg/apis/batch/types.go @@ -44,6 +44,10 @@ const ( JobNameLabel = labelPrefix + LegacyJobNameLabel // Controller UID is used for selectors and labels for jobs ControllerUidLabel = labelPrefix + LegacyControllerUidLabel + // Label indicating the controller that manages a Job. When the label is + // absent on a job object, or its value equals "job-controller.k8s.io" then + // the Job is reconciled by the built-in Job controller. + JobManagedByLabel = labelPrefix + "managed-by" // Annotation indicating the number of failures for the index corresponding // to the pod, which are counted towards the backoff limit. JobIndexFailureCountAnnotation = labelPrefix + "job-index-failure-count" @@ -51,6 +55,9 @@ const ( // to the pod, which don't count towards the backoff limit, according to the // pod failure policy. When the annotation is absent zero is implied. JobIndexIgnoredFailureCountAnnotation = labelPrefix + "job-index-ignored-failure-count" + // JobControllerName reserved value for the managed-by label for the built-in + // Job controller. + JobControllerName = "job-controller.k8s.io" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index e8c22ba7be680..c63b8d2ad88f4 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -19,6 +19,7 @@ package validation import ( "fmt" "regexp" + "strconv" "strings" "time" @@ -31,10 +32,13 @@ import ( "k8s.io/apimachinery/pkg/util/sets" apimachineryvalidation "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/validation/field" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" apivalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) // maxParallelismForIndexJob is the maximum parallelism that an Indexed Job @@ -389,8 +393,9 @@ func validatePodFailurePolicyRuleOnExitCodes(onExitCode *batch.PodFailurePolicyO } // validateJobStatus validates a JobStatus and returns an ErrorList with any errors. -func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.ErrorList { +func validateJobStatus(job *batch.Job, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} + status := job.Status allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Active), fldPath.Child("active"))...) allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Succeeded), fldPath.Child("succeeded"))...) allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(status.Failed), fldPath.Child("failed"))...) @@ -400,6 +405,27 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error if status.Terminating != nil { allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*status.Terminating), fldPath.Child("terminating"))...) } + + if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) { + isComplete := isConditionTrue(status.Conditions, batch.JobComplete) + if isComplete && isConditionTrue(status.Conditions, batch.JobFailed) { + allErrs = append(allErrs, field.Invalid(fldPath.Child("conditions"), field.OmitValueType{}, "both Complete=True and Failed=True conditions are set")) + } + if status.CompletionTime != nil && !isComplete { + allErrs = append(allErrs, field.Invalid(fldPath.Child("completionTime"), status.CompletionTime, "completeTime can only be set when Complete=True")) + } + if ptr.Deref(job.Spec.CompletionMode, batch.NonIndexedCompletion) == batch.NonIndexedCompletion { + if status.CompletedIndexes != "" { + allErrs = append(allErrs, field.Invalid(fldPath.Child("completedIndexes"), status.CompletedIndexes, "non-empty completedIndexes when non-indexed completion mode")) + } + } + if job.Spec.BackoffLimitPerIndex == nil { + if status.FailedIndexes != nil { + allErrs = append(allErrs, field.Invalid(fldPath.Child("failedIndexes"), *status.FailedIndexes, "non-nil failedIndexes when backoffLimitPerIndex=nil")) + } + } + } + if status.UncountedTerminatedPods != nil { path := fldPath.Child("uncountedTerminatedPods") seen := sets.NewString() @@ -429,15 +455,23 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error // ValidateJobUpdate validates an update to a Job and returns an ErrorList with any errors. func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.ErrorList { - allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata")) + allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata")) allErrs = append(allErrs, ValidateJobSpecUpdate(job.Spec, oldJob.Spec, field.NewPath("spec"), opts)...) return allErrs } // ValidateJobUpdateStatus validates an update to the status of a Job and returns an ErrorList with any errors. func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList { - allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata")) - allErrs = append(allErrs, ValidateJobStatusUpdate(job.Status, oldJob.Status)...) + allErrs := validateJobMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata")) + allErrs = append(allErrs, ValidateJobStatusUpdate(job, oldJob)...) + return allErrs +} + +func validateJobMetaUpdate(newMeta, oldMeta *metav1.ObjectMeta, fldPath *field.Path) field.ErrorList { + allErrs := apivalidation.ValidateObjectMetaUpdate(newMeta, oldMeta, fldPath) + if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) { + allErrs = append(allErrs, apivalidation.ValidateImmutableLabel(newMeta.Labels[batch.JobManagedByLabel], oldMeta.Labels[batch.JobManagedByLabel], batch.JobManagedByLabel, fldPath)...) + } return allErrs } @@ -485,9 +519,28 @@ func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, } // ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors. -func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList { +func ValidateJobStatusUpdate(job, oldJob *batch.Job) field.ErrorList { allErrs := field.ErrorList{} - allErrs = append(allErrs, validateJobStatus(&status, field.NewPath("status"))...) + statusFld := field.NewPath("status") + allErrs = append(allErrs, validateJobStatus(job, statusFld)...) + + if utilfeature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) { + for _, cType := range []batch.JobConditionType{batch.JobFailed, batch.JobComplete, batch.JobFailureTarget} { + if isConditionTrue(oldJob.Status.Conditions, cType) && !isConditionTrue(job.Status.Conditions, cType) { + allErrs = append(allErrs, field.Invalid(statusFld.Child("conditions"), field.OmitValueType{}, fmt.Sprintf("disablement of %s=True condition", string(cType)))) + } + } + if job.Status.CompletedIndexes != oldJob.Status.CompletedIndexes { + if err := validateIndexesFormat(job.Status.CompletedIndexes, int(*job.Spec.Completions)); err != nil { + allErrs = append(allErrs, field.Invalid(statusFld.Child("completedIndexes"), job.Status.CompletedIndexes, fmt.Sprintf("parsing error: %s", err.Error()))) + } + } + if job.Status.FailedIndexes != oldJob.Status.FailedIndexes && job.Status.FailedIndexes != nil { + if err := validateIndexesFormat(*job.Status.FailedIndexes, int(*job.Spec.Completions)); err != nil { + allErrs = append(allErrs, field.Invalid(statusFld.Child("failedIndexes"), job.Status.FailedIndexes, fmt.Sprintf("parsing error: %s", err.Error()))) + } + } + } return allErrs } @@ -665,6 +718,60 @@ func validateCompletions(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts return allErrs } +func isConditionTrue(list []batch.JobCondition, cType batch.JobConditionType) bool { + if condition := findConditionByType(list, cType); condition != nil { + return condition.Status == api.ConditionTrue + } + return false +} + +func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType) *batch.JobCondition { + for i := range list { + if list[i].Type == cType { + return &list[i] + } + } + return nil +} + +func validateIndexesFormat(indexesStr string, completions int) error { + if indexesStr == "" { + return nil + } + var lastIndex *int + for _, intervalStr := range strings.Split(indexesStr, ",") { + limitsStr := strings.Split(intervalStr, "-") + if len(limitsStr) > 2 { + return fmt.Errorf("too many parts separated by '-' in %q", intervalStr) + } + x, err := strconv.Atoi(limitsStr[0]) + if err != nil { + return fmt.Errorf("cannot parse integer index: %q", limitsStr[0]) + } + if x >= completions { + return fmt.Errorf("too large index: %q", limitsStr[0]) + } + if lastIndex != nil && *lastIndex >= x { + return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, x) + } + lastIndex = ptr.To[int](x) + if len(limitsStr) > 1 { + y, err := strconv.Atoi(limitsStr[1]) + if err != nil { + return fmt.Errorf("cannot parse integer index: %q", limitsStr[1]) + } + if y >= completions { + return fmt.Errorf("too large index: %q", limitsStr[1]) + } + if *lastIndex >= y { + return fmt.Errorf("non-increasing order, previous: %d, current: %d", *lastIndex, y) + } + lastIndex = ptr.To[int](y) + } + } + return nil +} + type JobValidationOptions struct { apivalidation.PodValidationOptions // Allow mutable node affinity, selector and tolerations of the template diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index 5288219551214..c4d436e2107fd 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -17,6 +17,7 @@ limitations under the License. package validation import ( + "errors" _ "time/tzdata" "fmt" @@ -29,10 +30,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" corevalidation "k8s.io/kubernetes/pkg/apis/core/validation" + "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" ) var ( @@ -1326,6 +1331,8 @@ func TestValidateJobUpdate(t *testing.T) { Effect: api.TaintEffectPreferNoSchedule, }} cases := map[string]struct { + enableManagedByLabel bool + old batch.Job update func(*batch.Job) opts JobValidationOptions @@ -1349,6 +1356,44 @@ func TestValidateJobUpdate(t *testing.T) { job.Spec.ManualSelector = pointer.Bool(true) }, }, + "update managed-by label; feature disabled": { + enableManagedByLabel: false, + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + }, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + }, + }, + update: func(job *batch.Job) { + job.Labels[batch.JobManagedByLabel] = "custom-controller" + }, + }, + "update managed-by label; feature enabled": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + }, + Spec: batch.JobSpec{ + Selector: validGeneratedSelector, + Template: validPodTemplateSpecForGenerated, + }, + }, + update: func(job *batch.Job) { + job.Labels[batch.JobManagedByLabel] = "custom-controller" + }, + err: &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "metadata.labels.batch.kubernetes.io/managed-by", + }, + }, "immutable completions for non-indexed jobs": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault}, @@ -1997,6 +2042,7 @@ func TestValidateJobUpdate(t *testing.T) { ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail") for k, tc := range cases { t.Run(k, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobManagedByLabel, tc.enableManagedByLabel)() tc.old.ResourceVersion = "1" update := tc.old.DeepCopy() tc.update(update) @@ -2013,7 +2059,17 @@ func TestValidateJobUpdate(t *testing.T) { } func TestValidateJobUpdateStatus(t *testing.T) { + now := metav1.Now() + validObjectMeta := metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + ResourceVersion: "2", + } + cases := map[string]struct { + enableManagedByLabel bool + old batch.Job update batch.Job wantErrs field.ErrorList @@ -2047,6 +2103,61 @@ func TestValidateJobUpdateStatus(t *testing.T) { }, }, }, + "update managed-by label; feature disabled": { + enableManagedByLabel: false, + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + }, + Status: batch.JobStatus{ + Active: 2, + }, + }, + update: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "1", + Labels: map[string]string{ + batch.JobManagedByLabel: "custom-job-controller", + }, + }, + Status: batch.JobStatus{ + Active: 2, + }, + }, + }, + "update managed-by label; feature enabled": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{}, + }, + Status: batch.JobStatus{ + Active: 2, + }, + }, + update: batch.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "abc", + Namespace: metav1.NamespaceDefault, + ResourceVersion: "1", + Labels: map[string]string{ + batch.JobManagedByLabel: "custom-job-controller", + }, + }, + Status: batch.JobStatus{ + Active: 2, + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "metadata.labels.batch.kubernetes.io/managed-by"}, + }, + }, "nil ready and terminating": { old: batch.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -2138,9 +2249,226 @@ func TestValidateJobUpdateStatus(t *testing.T) { {Type: field.ErrorTypeInvalid, Field: "status.uncountedTerminatedPods.failed[4]"}, }, }, + "invalid addition of both Failed=True and Complete=True": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: api.ConditionTrue, + }, + { + Type: batch.JobFailed, + Status: api.ConditionTrue, + }, + }, + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid removal of terminal condition Failed=True": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailed, + Status: api.ConditionTrue, + }, + }, + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid removal of terminal condition Complete=True": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobComplete, + Status: api.ConditionTrue, + }, + }, + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid removal of terminal condition FailureTarget=True": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + Conditions: []batch.JobCondition{ + { + Type: batch.JobFailureTarget, + Status: api.ConditionTrue, + }, + }, + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.conditions"}, + }, + }, + "invalid setting of CompletionTime when there is not Complete condition": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Status: batch.JobStatus{ + CompletionTime: &now, + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.completionTime"}, + }, + }, + "invalid setting of CompletedIndexes when non-indexed completion mode is used": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + }, + Status: batch.JobStatus{ + CompletedIndexes: "0", + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.completedIndexes"}, + }, + }, + "invalid setting of FailedIndexes when not backoffLimitPerIndex": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + Status: batch.JobStatus{ + FailedIndexes: ptr.To("0"), + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.failedIndexes"}, + }, + }, + "invalid because completedIndexes set for non-indexed job": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.NonIndexedCompletion), + }, + Status: batch.JobStatus{ + CompletedIndexes: "0", + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.completedIndexes"}, + }, + }, + "invalid format for CompletedIndexes": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + }, + Status: batch.JobStatus{ + CompletedIndexes: "invalid format", + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.completedIndexes"}, + }, + }, + "invalid format for FailedIndexes": { + enableManagedByLabel: true, + old: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + }, + update: batch.Job{ + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Completions: ptr.To[int32](5), + CompletionMode: completionModePtr(batch.IndexedCompletion), + BackoffLimitPerIndex: pointer.Int32(1), + }, + Status: batch.JobStatus{ + FailedIndexes: ptr.To("invalid format"), + }, + }, + wantErrs: field.ErrorList{ + {Type: field.ErrorTypeInvalid, Field: "status.failedIndexes"}, + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobManagedByLabel, tc.enableManagedByLabel)() errs := ValidateJobUpdateStatus(&tc.update, &tc.old) if diff := cmp.Diff(tc.wantErrs, errs, ignoreErrValueDetail); diff != "" { t.Errorf("Unexpected errors (-want,+got):\n%s", diff) @@ -3587,3 +3915,88 @@ func TestTimeZones(t *testing.T) { } } } + +func TestValidateIndexesString(t *testing.T) { + testCases := map[string]struct { + indexesString string + completions int + wantError error + }{ + "empty is valid": { + indexesString: "", + completions: 6, + }, + "single number is valid": { + indexesString: "1", + completions: 6, + }, + "single interval is valid": { + indexesString: "1-3", + completions: 6, + }, + "mixed intervals valid": { + indexesString: "0,1-3,5,7-10", + completions: 12, + }, + "invalid due to extra space": { + indexesString: "0,1-3, 5", + completions: 6, + wantError: errors.New(`cannot parse integer index: " 5"`), + }, + "invalid due to too large index": { + indexesString: "0,1-3,5", + completions: 5, + wantError: errors.New(`too large index: "5"`), + }, + "invalid due to non-increasing order of intervals": { + indexesString: "1-3,0,5", + completions: 6, + wantError: errors.New(`non-increasing order, previous: 3, current: 0`), + }, + "invalid due to non-increasing order between intervals": { + indexesString: "0,0,5", + completions: 6, + wantError: errors.New(`non-increasing order, previous: 0, current: 0`), + }, + "invalid due to non-increasing order within interval": { + indexesString: "0,1-1,5", + completions: 6, + wantError: errors.New(`non-increasing order, previous: 1, current: 1`), + }, + "invalid due to starting with '-'": { + indexesString: "-1,0", + completions: 6, + wantError: errors.New(`cannot parse integer index: ""`), + }, + "invalid due to ending with '-'": { + indexesString: "0,1-", + completions: 6, + wantError: errors.New(`cannot parse integer index: ""`), + }, + "invalid due to repeated '-'": { + indexesString: "0,1--3", + completions: 6, + wantError: errors.New(`too many parts separated by '-' in "1--3"`), + }, + "invalid due to repeated ','": { + indexesString: "0,,1,3", + completions: 6, + wantError: errors.New(`cannot parse integer index: ""`), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + gotErr := validateIndexesFormat(tc.indexesString, tc.completions) + if tc.wantError == nil && gotErr != nil { + t.Errorf("unexpected error: %s", gotErr) + } else if tc.wantError != nil && gotErr == nil { + t.Errorf("missing error: %s", tc.wantError) + } else if tc.wantError != nil && gotErr != nil { + if diff := cmp.Diff(tc.wantError.Error(), gotErr.Error()); diff != "" { + t.Errorf("unexpected error, diff: %s", diff) + } + } + }) + } +} diff --git a/pkg/apis/core/validation/validation.go b/pkg/apis/core/validation/validation.go index bcbf344963a29..d3db2d013f710 100644 --- a/pkg/apis/core/validation/validation.go +++ b/pkg/apis/core/validation/validation.go @@ -352,6 +352,14 @@ func ValidateImmutableField(newVal, oldVal interface{}, fldPath *field.Path) fie return apimachineryvalidation.ValidateImmutableField(newVal, oldVal, fldPath) } +func ValidateImmutableLabel(newVal string, oldVal string, label string, fldPath *field.Path) field.ErrorList { + allErrs := field.ErrorList{} + if oldVal != newVal { + allErrs = append(allErrs, field.Invalid(fldPath.Child("labels", label), newVal, fieldImmutableErrorMsg)) + } + return allErrs +} + func ValidateImmutableAnnotation(newVal string, oldVal string, annotation string, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1b0da9511cc5b..a98986e808225 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -168,7 +168,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - jm.enqueueSyncJobImmediately(logger, obj) + jm.addJob(logger, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { jm.updateJob(logger, oldObj, newObj) @@ -447,6 +447,16 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) jm.enqueueSyncJobBatched(logger, job) } +func (jm *Controller) addJob(logger klog.Logger, obj interface{}) { + jm.enqueueSyncJobImmediately(logger, obj) + jobObj, ok := obj.(*batch.Job) + if ok { + if controllerName, uses := managedByExternalController(jobObj); uses { + metrics.JobByExternalControllerTotal.WithLabelValues(controllerName).Inc() + } + } +} + func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) { oldJob := old.(*batch.Job) curJob := cur.(*batch.Job) @@ -544,6 +554,16 @@ func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{} utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } + + // This is a performance optimization. When the Job object is managed by + // an external controller we don't need to queue the synchronization tasks. + if jobObj, ok := obj.(*batch.Job); ok { + if controllerName, managed := managedByExternalController(jobObj); managed { + logger.V(2).Info("Don't queue syncing of the job as it is managed by an external controller", "key", key, "uid", jobObj.UID, "controllerName", controllerName) + return + } + } + // TODO: Handle overlapping controllers better. Either disallow them at admission time or // deterministically avoid syncing controllers that fight over pods. Currently, we only // ensure that the same controller is synced for a given pod. When we periodically relist @@ -731,6 +751,17 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } return err } + + // Skip syncing of the job it is managed by another controller. + // We cannot rely solely on skipping of queueing such jobs for synchronization, + // because it is possible a synchronization task is queued for a job, without + // the managed by label, but the job is quickly replaced by another job with + // the label. Then, the syncJob might be invoked for a job with the label. + if controllerName, managed := managedByExternalController(sharedJob); managed { + logger.V(2).Info("Skip syncing the job as it is managed by an external controller", "key", key, "uid", sharedJob.UID, "controllerName", controllerName) + return nil + } + // make a copy so we don't mutate the shared cache job := *sharedJob.DeepCopy() @@ -1936,3 +1967,14 @@ func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) { metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed)) } } + +func managedByExternalController(jobObj *batch.Job) (string, bool) { + if feature.DefaultFeatureGate.Enabled(features.JobManagedByLabel) { + if controllerName, found := jobObj.Labels[batch.JobManagedByLabel]; found { + if controllerName != batch.JobControllerName { + return controllerName, true + } + } + } + return "", false +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 7f33daa9b62a4..ada5605a7dea0 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -2292,6 +2292,135 @@ func TestSyncJobDeleted(t *testing.T) { } } +func TestSyncJobWhenManagedByLabel(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + now := metav1.Now() + validObjectMeta := metav1.ObjectMeta{ + Name: "foobar", + UID: uuid.NewUUID(), + Namespace: metav1.NamespaceDefault, + Labels: make(map[string]string), + } + validSelector := &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + } + validTemplate := v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + {Image: "foo/bar"}, + }, + }, + } + + baseJob := batch.Job{ + TypeMeta: metav1.TypeMeta{Kind: "Job"}, + ObjectMeta: validObjectMeta, + Spec: batch.JobSpec{ + Selector: validSelector, + Template: validTemplate, + Parallelism: ptr.To[int32](2), + Completions: ptr.To[int32](2), + BackoffLimit: ptr.To[int32](6), + }, + Status: batch.JobStatus{ + Active: 1, + Ready: ptr.To[int32](1), + StartTime: &now, + }, + } + + testCases := map[string]struct { + enableJobManagedByLabel bool + job batch.Job + wantStatus batch.JobStatus + }{ + "job with custom value of managed-by label; feature enabled; the status is unchanged": { + enableJobManagedByLabel: true, + job: func() batch.Job { + job := baseJob.DeepCopy() + job.Labels[batch.JobManagedByLabel] = "custom-managed-by" + return *job + }(), + wantStatus: baseJob.Status, + }, + "job with managed-by label equal job-controller.k8s.io; feature enabled; the status is updated": { + enableJobManagedByLabel: true, + job: func() batch.Job { + job := baseJob.DeepCopy() + job.Labels[batch.JobManagedByLabel] = "job-controller.k8s.io" + return *job + }(), + wantStatus: batch.JobStatus{ + Active: 2, + Ready: ptr.To[int32](0), + StartTime: &now, + Terminating: ptr.To[int32](0), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + "job with custom value of managed-by label; feature disabled; the status is updated": { + job: func() batch.Job { + job := baseJob.DeepCopy() + job.Labels[batch.JobManagedByLabel] = "custom-managed-by" + return *job + }(), + wantStatus: batch.JobStatus{ + Active: 2, + Ready: ptr.To[int32](0), + StartTime: &now, + Terminating: ptr.To[int32](0), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + "job without the managed-by label; feature enabled; the status is updated": { + enableJobManagedByLabel: true, + job: baseJob, + wantStatus: batch.JobStatus{ + Active: 2, + Ready: ptr.To[int32](0), + StartTime: &now, + Terminating: ptr.To[int32](0), + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedByLabel, tc.enableJobManagedByLabel)() + + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + job := &tc.job + + actual := job + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { + actual = job + return job, nil + } + if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil { + t.Fatalf("error %v while adding the %v job to the index", err, klog.KObj(job)) + } + + if err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)); err != nil { + t.Fatalf("error %v while reconciling the job %v", err, testutil.GetKey(job, t)) + } + + if diff := cmp.Diff(tc.wantStatus, actual.Status); diff != "" { + t.Errorf("Unexpected job status (-want,+got):\n%s", diff) + } + }) + } +} + func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { _, ctx := ktesting.NewTestContext(t) now := metav1.Now() @@ -3963,6 +4092,7 @@ func TestUpdateJobRequeue(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) cases := map[string]struct { + enableJobManagedByLabel bool oldJob *batch.Job updateFn func(job *batch.Job) wantRequeuedImmediately bool @@ -3975,6 +4105,19 @@ func TestUpdateJobRequeue(t *testing.T) { }, wantRequeuedImmediately: true, }, + "spec update; managed-by label used": { + enableJobManagedByLabel: true, + oldJob: func() *batch.Job { + job := newJob(1, 1, 1, batch.IndexedCompletion) + job.Labels = map[string]string{batch.JobManagedByLabel: "custom-job-controller"} + return job + }(), + updateFn: func(job *batch.Job) { + job.Spec.Suspend = ptr.To(false) + job.Generation++ + }, + wantRequeuedImmediately: false, + }, "status update": { oldJob: newJob(1, 1, 1, batch.IndexedCompletion), updateFn: func(job *batch.Job) { @@ -3985,6 +4128,7 @@ func TestUpdateJobRequeue(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedByLabel, tc.enableJobManagedByLabel)() manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index 39a82f53f9b9b..626a79d46d073 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -71,6 +71,20 @@ var ( []string{"completion_mode", "result", "reason"}, ) + // JobByExternalControllerTotal tracks the number of Jobs that were created + // as managed by an external controller. + // The value of the label controllerName corresponds to the value of the + // managed-by label. + JobByExternalControllerTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "jobs_by_external_controller_total", + Help: "The number of Jobs managed by an external controller", + StabilityLevel: metrics.ALPHA, + }, + []string{"controller_name"}, + ) + // JobPodsFinished records the number of finished Pods that the job controller // finished tracking. // It only applies to Jobs that were created while the feature gate @@ -195,5 +209,6 @@ func Register() { legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) legacyregistry.MustRegister(JobFinishedIndexesTotal) legacyregistry.MustRegister(JobPodsCreationTotal) + legacyregistry.MustRegister(JobByExternalControllerTotal) }) } diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 26f215a56da80..53dc1d31374ce 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -350,6 +350,13 @@ const ( // Allows users to specify counting of failed pods per index. JobBackoffLimitPerIndex featuregate.Feature = "JobBackoffLimitPerIndex" + // owner: @mimowo + // kep: https://kep.k8s.io/4368 + // beta: v1.30 + // + // Allows to delegate reconciliation of a Job object to an external controller. + JobManagedByLabel featuregate.Feature = "JobManagedByLabel" + // owner: @mimowo // kep: https://kep.k8s.io/3329 // alpha: v1.25 @@ -1006,6 +1013,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS JobBackoffLimitPerIndex: {Default: true, PreRelease: featuregate.Beta}, + JobManagedByLabel: {Default: true, PreRelease: featuregate.Beta}, + JobPodFailurePolicy: {Default: true, PreRelease: featuregate.Beta}, JobPodReplacementPolicy: {Default: true, PreRelease: featuregate.Beta}, diff --git a/staging/src/k8s.io/api/batch/v1/types.go b/staging/src/k8s.io/api/batch/v1/types.go index 53fdf3c8d01bf..3a1463f5d3828 100644 --- a/staging/src/k8s.io/api/batch/v1/types.go +++ b/staging/src/k8s.io/api/batch/v1/types.go @@ -47,6 +47,10 @@ const ( // Historically the job controller uses unprefixed labels for job-name and controller-uid and // Kubernetes continutes to recognize those unprefixed labels for consistency. JobNameLabel = labelPrefix + "job-name" + // Label indicating the controller that manages a Job. When the label is + // absent on a job object, or its value equals "job-controller.k8s.io" then + // the Job is reconciled by the built-in Job controller. + JobManagedByLabel = labelPrefix + "managed-by" // ControllerUid is used to programatically get pods corresponding to a Job. // There is a corresponding label without the batch.kubernetes.io that we support for legacy reasons. ControllerUidLabel = labelPrefix + "controller-uid" @@ -57,6 +61,9 @@ const ( // to the pod, which don't count towards the backoff limit, according to the // pod failure policy. When the annotation is absent zero is implied. JobIndexIgnoredFailureCountAnnotation = labelPrefix + "job-index-ignored-failure-count" + // JobControllerName reserved value for the managed-by label for the built-in + // Job controller. + JobControllerName = "job-controller.k8s.io" ) // +genclient diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index a2337f666d418..78648057e4809 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -52,6 +52,8 @@ import ( "k8s.io/utils/pointer" "k8s.io/utils/ptr" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) @@ -564,6 +566,107 @@ done`} gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(1))) }) + /* + Testcase: Skip reconciling a job that has a custom managed-by label + Description: First, we create a suspended job with a custom value of the + managed-by, and verify the Job is not reconciled by the built-in Job + controller. If it was, the startTime status field, and the Suspended + conditions were added. Next, we simulate that the external controller + unsuspends the Job, and verify it remains not reconciled as the Active + status field is not incremented. + */ + ginkgo.It("should not reconcile a job with custom managed-by label", func(ctx context.Context) { + parallelism := int32(2) + completions := int32(2) + backoffLimit := int32(6) // default value + + ginkgo.By("Creating a suspended job with a custom managed-by label") + job := e2ejob.NewTestJob("succeed", "custom-managed-by", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + if job.Labels == nil { + job.Labels = make(map[string]string) + } + job.Labels[batchv1.JobManagedByLabel] = "custom-job-controller" + job.Spec.Suspend = ptr.To(true) + + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Verify the job is not reconciled, and does not get the Suspended condition") + gomega.Consistently(func() string { + newJob, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err) + return cmp.Diff(batchv1.JobStatus{}, newJob.Status, cmpopts.EquateEmpty()) + }).WithPolling(100 * time.Millisecond).WithTimeout(2 * time.Second).Should(gomega.BeEmpty()) + + ginkgo.By("Unsuspend the job simulating an external controller") + job.Spec.Suspend = ptr.To(false) + job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err) + + ginkgo.By("Verify the unsuspended job remains not reconciled") + // Let's wait 2s, which doubles the time between running the Job synchronization + // workers. + gomega.Consistently(func() string { + newJob, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err) + return cmp.Diff(batchv1.JobStatus{}, newJob.Status, cmpopts.EquateEmpty()) + }).WithPolling(100 * time.Millisecond).WithTimeout(2 * time.Second).Should(gomega.BeEmpty()) + }) + + /* + Testcase: Skip reconciling a replacement job that has a custom managed-by label + Description: First we create a job without the managed-by label, so that + the synchronization is queued. Next, we quickly replace the job with + another, using the same namespace and name, but with a custom value of + the label. We verify that the replacement job is not reconciled. + */ + ginkgo.It("should not reconcile a replacement job with custom managed-by label", func(ctx context.Context) { + parallelism := int32(1) + completions := int32(1) + backoffLimit := int32(6) // default value + + baseJob := e2ejob.NewTestJob("succeed", "custom-managed-by", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit) + + ginkgo.By("Creating a suspended job without a custom managed-by label") + job := baseJob.DeepCopy() + job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Await for the job pod to be created") + err = e2ejob.WaitForJobPodsSucceeded(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1) + framework.ExpectNoError(err, "failed to delete job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Delete the job managed by the built-in controller") + err = f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Delete(ctx, job.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err, "failed to delete job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Quickly replace the deleted job with a job with custom managed-by label", func() { + ginkgo.By("Await for the deleted job to be gone from the API server") + gomega.Eventually(func() bool { + _, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + return apierrors.IsNotFound(err) + }).WithTimeout(time.Minute).WithPolling(100 * time.Millisecond).Should(gomega.BeTrueBecause("job should be gone from the API server")) + framework.ExpectNoError(err, "failed to delete a job in namespace: %s", f.Namespace.Name) + + ginkgo.By("Re-create the job, but now with managed-by label") + job = baseJob.DeepCopy() + if job.Labels == nil { + job.Labels = make(map[string]string) + } + job.Labels[batchv1.JobManagedByLabel] = "custom-job-controller" + job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job) + framework.ExpectNoError(err) + }) + + ginkgo.By("Verify the Job is not reconciled as the status remains empty") + gomega.Consistently(func() string { + newJob, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name) + framework.ExpectNoError(err) + emptyJobStatus := batchv1.JobStatus{} + return cmp.Diff(emptyJobStatus, newJob.Status, cmpopts.EquateEmpty()) + }).WithTimeout(2 * time.Second).WithPolling(500 * time.Millisecond).Should(gomega.BeEmpty()) + }) + /* Testcase: Ensure that the pods associated with the job are removed once the job is deleted Description: Create a job and ensure the associated pod count is equal to parallelism count. Delete the diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4067c82b9949d..09ddcc92c60c3 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1174,6 +1174,263 @@ func TestBackoffLimitPerIndex(t *testing.T) { } } +func TestManagedByLabel(t *testing.T) { + podTemplateSpec := v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main-container", + Image: "foo", + ImagePullPolicy: v1.PullIfNotPresent, + TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, + }, + }, + }, + } + testCases := map[string]struct { + jobManagedByLabelEnabled bool + job batchv1.Job + customStatus *batchv1.JobStatus + wantReconciledByBuiltInController bool + wantJobByExternalControllerTotalMetric *metricLabelsWithValue + }{ + "the Job controller reconciles jobs without the managed-by label": { + jobManagedByLabelEnabled: true, + job: batchv1.Job{ + Spec: batchv1.JobSpec{ + Template: podTemplateSpec, + }, + }, + wantReconciledByBuiltInController: true, + }, + "the Job controller reconciles jobs with the managed-by label equal to job-controller.k8s.io": { + jobManagedByLabelEnabled: true, + job: batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + batchv1.JobManagedByLabel: "job-controller.k8s.io", + }, + }, + Spec: batchv1.JobSpec{ + Template: podTemplateSpec, + }, + }, + wantReconciledByBuiltInController: true, + }, + "the Job controller does not reconcile an unsuspended with the custom value of managed-by; feature disabled": { + jobManagedByLabelEnabled: false, + job: batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + batchv1.JobManagedByLabel: "custom-job-controller", + }, + }, + Spec: batchv1.JobSpec{ + Template: podTemplateSpec, + }, + }, + wantReconciledByBuiltInController: true, + }, + "the Job controller does not reconcile an unsuspended with the custom value of managed-by": { + jobManagedByLabelEnabled: true, + job: batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + batchv1.JobManagedByLabel: "custom-job-controller", + }, + }, + Spec: batchv1.JobSpec{ + Suspend: ptr.To(false), + Template: podTemplateSpec, + }, + }, + customStatus: &batchv1.JobStatus{ + Active: 1, + }, + wantReconciledByBuiltInController: false, + wantJobByExternalControllerTotalMetric: &metricLabelsWithValue{ + Labels: []string{"custom-job-controller"}, + Value: 1, + }, + }, + "the Job controller does not reconcile a suspended with the custom value of managed-by": { + jobManagedByLabelEnabled: true, + job: batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + batchv1.JobManagedByLabel: "custom-job-controller", + }, + }, + Spec: batchv1.JobSpec{ + Suspend: ptr.To(true), + Template: podTemplateSpec, + }, + }, + wantReconciledByBuiltInController: false, + wantJobByExternalControllerTotalMetric: &metricLabelsWithValue{ + Labels: []string{"custom-job-controller"}, + Value: 1, + }, + }, + } + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + resetMetrics() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedByLabel, test.jobManagedByLabelEnabled)() + + closeFn, restConfig, clientSet, ns := setup(t, "managed-by") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + defer func() { + cancel() + }() + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job) + if err != nil { + t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) + } + + if test.customStatus != nil { + if test.wantReconciledByBuiltInController { + t.Fatal("The test should not set custom status if built-in reconciler is enabled") + } + jobCopy := jobObj.DeepCopy() + jobCopy.Status = *test.customStatus + jobObj, err = clientSet.BatchV1().Jobs(ns.Name).UpdateStatus(ctx, jobCopy, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to update Job: %v", err) + } + } + + if test.wantReconciledByBuiltInController { + cnt := int(*jobObj.Spec.Parallelism) + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: cnt, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }) + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, cnt); err != nil { + t.Fatalf("error %v when setting phase on job %v pods", err, klog.KObj(jobObj)) + } + validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete) + } else { + oldStatus := jobObj.Status + time.Sleep(2 * time.Second) + jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get the new Job: %v", err) + } + if diff := cmp.Diff(oldStatus, jobObj.Status); diff != "" { + t.Fatalf("Unexpected status (-want/+got): %s", diff) + } + } + if test.wantJobByExternalControllerTotalMetric != nil { + validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, *test.wantJobByExternalControllerTotalMetric) + } + }) + } +} + +func TestManagedByLabel_RecreatedJob(t *testing.T) { + podTemplateSpec := v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "main-container", + Image: "foo", + ImagePullPolicy: v1.PullIfNotPresent, + TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError, + }, + }, + }, + } + + resetMetrics() + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedByLabel, true)() + + closeFn, restConfig, clientSet, ns := setup(t, "managed-by-recreate-job") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) + defer func() { + cancel() + }() + baseJob := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "custom-job-test", + Namespace: ns.Name, + Labels: make(map[string]string), + }, + Spec: batchv1.JobSpec{ + Completions: ptr.To[int32](1), + Parallelism: ptr.To[int32](1), + Template: podTemplateSpec, + }, + } + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &baseJob) + if err != nil { + t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: 1, + Ready: ptr.To[int32](0), + Terminating: ptr.To[int32](0), + }) + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err) + } + + validateJobSucceeded(ctx, t, clientSet, jobObj) + + err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Error: '%v' while deleting the pods", err) + } + + jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace) + err = jobClient.Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete job: %v", err) + } + jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get deleted job: %v", err) + } + + jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) { + j.Finalizers = []string{} + }) + if err != nil { + t.Fatalf("Failed to clean up the orphan finalizer: %v", err) + } + + // Wait for pods to start up. + err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(c context.Context) (done bool, err error) { + _, e := jobClient.Get(c, jobObj.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(e) { + return true, nil + } + return false, e + }) + if err != nil { + t.Fatalf("waiting for job %#v to be deleted: %v", jobObj, err) + } + + jobWithManagedBy := baseJob.DeepCopy() + jobWithManagedBy.Labels[batchv1.JobManagedByLabel] = "custom-job-controller" + jobObj, err = createJobWithDefaults(ctx, clientSet, ns.Name, jobWithManagedBy) + if err != nil { + t.Fatalf("Error %q while creating the job %q", err, jobObj.Name) + } + + time.Sleep(2 * time.Second) + jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get deleted job: %v", err) + } + if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" { + t.Fatalf("unexpected status, diff: %s", diff) + } +} + func getIndexFailureCount(p *v1.Pod) (int, error) { if p.Annotations == nil { return 0, errors.New("no annotations found") @@ -3152,6 +3409,7 @@ func resetMetrics() { metrics.PodFailuresHandledByFailurePolicy.Reset() metrics.JobFinishedIndexesTotal.Reset() metrics.JobPodsCreationTotal.Reset() + metrics.JobByExternalControllerTotal.Reset() } func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {