diff --git a/cmd/importer/main.go b/cmd/importer/main.go index daff6b49eb..50522530e6 100644 --- a/cmd/importer/main.go +++ b/cmd/importer/main.go @@ -23,11 +23,13 @@ import ( "github.com/spf13/cobra" "github.com/spf13/pflag" + "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log/zap" - "sigs.k8s.io/kueue/cmd/importer/check" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/pod" "sigs.k8s.io/kueue/cmd/importer/util" "sigs.k8s.io/kueue/pkg/util/useragent" ) @@ -38,7 +40,7 @@ const ( QueueMappingFlag = "queuemapping" QueueLabelFlag = "queuelabel" QPSFlag = "qps" - BurstFlag = "bust" + BurstFlag = "burst" ) var ( @@ -65,7 +67,11 @@ var ( } kubeConfig.Burst = bust - c, err := client.New(kubeConfig, client.Options{}) + if err := kueue.AddToScheme(scheme.Scheme); err != nil { + return err + } + + c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) if err != nil { return err } @@ -95,24 +101,66 @@ func init() { rootCmd.PersistentFlags().Float32(QPSFlag, 5, "") rootCmd.PersistentFlags().Int(BurstFlag, 10, "") + rootCmd.AddCommand( + &cobra.Command{ + Use: "check", + RunE: checkCmd, + }, + &cobra.Command{ + Use: "import", + RunE: importCmd, + }, + ) } func main() { - ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + ctrl.SetLogger(zap.New(zap.UseDevMode(true), zap.ConsoleEncoder())) err := rootCmd.Execute() if err != nil { os.Exit(1) } } +func loadMappingCache(ctx context.Context, cmd *cobra.Command) (*util.MappingCache, error) { + flags := cmd.Flags() + namespaces, err := flags.GetStringSlice(NamespaceFlag) + if err != nil { + return nil, err + } + queueLabel, err := flags.GetString(QueueLabelFlag) + if err != nil { + return nil, err + } + mapping, err := flags.GetStringToString(QueueMappingFlag) + if err != nil { + return nil, err + } + return util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping) +} + func checkCmd(cmd *cobra.Command, _ []string) error { log := ctrl.Log.WithName("check") ctx := ctrl.LoggerInto(context.Background(), log) - flags := cmd.Flags() - namespaces, _ := flags.GetStringSlice(NamespaceFlag) - queueLabel, _ := flags.GetString(QueueLabelFlag) - mapping, _ := flags.GetStringToString(QueueMappingFlag) - mappingCache, _ := util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping) + mappingCache, err := loadMappingCache(ctx, cmd) + if err != nil { + return err + } + + return pod.Check(ctx, k8sClient, mappingCache) +} + +func importCmd(cmd *cobra.Command, _ []string) error { + log := ctrl.Log.WithName("import") + ctx := ctrl.LoggerInto(context.Background(), log) + + mappingCache, err := loadMappingCache(ctx, cmd) + if err != nil { + return err + } + + if err = pod.Check(ctx, k8sClient, mappingCache); err != nil { + return err + } - return check.Check(ctx, k8sClient, mappingCache) + return pod.Import(ctx, k8sClient, mappingCache) } diff --git a/cmd/importer/check/check.go b/cmd/importer/pod/check.go similarity index 50% rename from cmd/importer/check/check.go rename to cmd/importer/pod/check.go index 0d4b4f5d0f..d9bea79d83 100644 --- a/cmd/importer/check/check.go +++ b/cmd/importer/pod/check.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package check +package pod import ( "context" @@ -29,45 +29,34 @@ import ( "sigs.k8s.io/kueue/cmd/importer/util" ) -var ( - ErrNoMappingKey = errors.New("no mapping key found") - ErrNoMapping = errors.New("no mapping found") - ErrLQNotFound = errors.New("localqueue not found") - ErrCQNotFound = errors.New("clusterqueue not found") -) - func Check(ctx context.Context, c client.Client, mappingCache *util.MappingCache) error { ch := make(chan corev1.Pod) go util.PushPods(ctx, c, mappingCache.Namespaces, mappingCache.QueueLabel, ch) - errs := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error { + summary := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error { log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) log.V(3).Info("Checking") - mappingKey, found := p.Labels[mappingCache.QueueLabel] - if !found { - return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMappingKey) - } - - queueName, found := mappingCache.Mapping[mappingKey] - if !found { - return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMapping) - } - nqQueues, found := mappingCache.LocalQueues[p.Namespace] - if !found { - return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound) - } - - lq, found := nqQueues[queueName] - if !found { - return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound) - } - - _, found = mappingCache.ClusterQueues[string(lq.Spec.ClusterQueue)] - if !found { - return fmt.Errorf("%s/%s cluster queue: %s: %w", p.Namespace, p.Name, queueName, ErrCQNotFound) + _, err := mappingCache.ClusterQueue(p) + if err != nil { + return err } + // do some additional checks like: + // - the preemption policies set in the cq's + // - (maybe) the resources managed by the queues + // - should have one and only one flavor + // - the flavor exists return nil }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0]) + } + var errs []error + for _, e := range summary.Errors { + errs = append(errs, e) + } return errors.Join(errs...) } diff --git a/cmd/importer/check/check_test.go b/cmd/importer/pod/check_test.go similarity index 96% rename from cmd/importer/check/check_test.go rename to cmd/importer/pod/check_test.go index 13fd966af2..f0f77a4275 100644 --- a/cmd/importer/check/check_test.go +++ b/cmd/importer/pod/check_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package check +package pod import ( "context" @@ -55,7 +55,7 @@ func TestCheckNamespace(t *testing.T) { pods: []corev1.Pod{ *basePodWrapper.Clone().Obj(), }, - wantError: ErrNoMapping, + wantError: util.ErrNoMapping, }, "no local queue": { pods: []corev1.Pod{ @@ -64,7 +64,7 @@ func TestCheckNamespace(t *testing.T) { mapping: map[string]string{ "q1": "lq1", }, - wantError: ErrLQNotFound, + wantError: util.ErrLQNotFound, }, "no cluster queue": { pods: []corev1.Pod{ @@ -76,7 +76,7 @@ func TestCheckNamespace(t *testing.T) { localQueues: []kueue.LocalQueue{ *baseLocalQueue.Obj(), }, - wantError: ErrCQNotFound, + wantError: util.ErrCQNotFound, }, "all found": { pods: []corev1.Pod{ @@ -114,5 +114,4 @@ func TestCheckNamespace(t *testing.T) { } }) } - } diff --git a/cmd/importer/pod/podimport.go b/cmd/importer/pod/podimport.go new file mode 100644 index 0000000000..3eb5ab2d7e --- /dev/null +++ b/cmd/importer/pod/podimport.go @@ -0,0 +1,203 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/workload" +) + +func Import(ctx context.Context, c client.Client, mappingCache *util.MappingCache) error { + ch := make(chan corev1.Pod) + go util.PushPods(ctx, c, mappingCache.Namespaces, mappingCache.QueueLabel, ch) + summary := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Importing") + + lq, err := mappingCache.LocalQueue(p) + if err != nil { + return err + } + + oldLq, found := p.Labels[constants.QueueLabel] + if !found { + if err := addLabels(ctx, c, p, lq.Name); err != nil { + return fmt.Errorf("cannot add queue label: %w", err) + } + } else if oldLq != lq.Name { + return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name) + } + + kp := pod.FromObject(p) + // Note: the recorder is not used for single pods, we can just pass nil for now. + wl, err := kp.ConstructComposableWorkload(ctx, c, nil) + if err != nil { + return fmt.Errorf("construct workload: %w", err) + } + if err := createWorkload(ctx, c, wl); err != nil { + return fmt.Errorf("creating workload: %w", err) + + } + + //make its admission and update its status + + info := workload.NewInfo(wl) + cq := mappingCache.ClusterQueues[string(lq.Spec.ClusterQueue)] + admission := kueue.Admission{ + ClusterQueue: kueue.ClusterQueueReference(cq.Name), + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: info.TotalRequests[0].Name, + Flavors: make(map[corev1.ResourceName]kueue.ResourceFlavorReference), + ResourceUsage: info.TotalRequests[0].Requests.ToResourceList(), + Count: ptr.To[int32](1), + }, + }, + } + flv := cq.Spec.ResourceGroups[0].Flavors[0].Name + for r := range info.TotalRequests[0].Requests { + admission.PodSetAssignments[0].Flavors[r] = flv + } + + wl.Status.Admission = &admission + reservedCond := metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, reservedCond) + admittedCond := metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond) + return admitWorkload(ctx, c, wl) + }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods) + } + var errs []error + for _, e := range summary.Errors { + errs = append(errs, e) + } + return errors.Join(errs...) +} + +func checkError(err error) (retry, reload bool, timeout time.Duration) { + retry_seconds, retry := apierrors.SuggestsClientDelay(err) + if retry { + return true, false, time.Duration(retry_seconds) * time.Second + } + + if apierrors.IsConflict(err) { + return true, true, 0 + } + return false, false, 0 +} + +func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string) error { + p.Labels[constants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + + err := c.Update(ctx, p) + retry, reload, timeouut := checkError(err) + + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + if reload { + err = c.Get(ctx, client.ObjectKeyFromObject(p), p) + if err != nil { + retry, reload, timeouut = checkError(err) + continue + } + p.Labels[constants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + } + err = c.Update(ctx, p) + retry, reload, timeouut = checkError(err) + } + return err +} + +func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := c.Create(ctx, wl) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = c.Create(ctx, wl) + retry, _, timeouut = checkError(err) + } + return err +} + +func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := workload.ApplyAdmissionStatus(ctx, c, wl, false) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = workload.ApplyAdmissionStatus(ctx, c, wl, false) + retry, _, timeouut = checkError(err) + } + return err +} diff --git a/cmd/importer/pod/podimport_test.go b/cmd/importer/pod/podimport_test.go new file mode 100644 index 0000000000..c4a83a618e --- /dev/null +++ b/cmd/importer/pod/podimport_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestImportNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + UID("pod"). + Label(testingQueueLabel, "q1"). + Image("img", nil). + Request(corev1.ResourceCPU, "1") + + baseWlWrapper := utiltesting.MakeWorkload("pod-pod-b17ab", testingNamespace). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "pod"). + Label(constants.JobUIDLabel, "pod"). + Finalizers(kueue.ResourceInUseFinalizerName). + Queue("lq1"). + PodSets(*utiltesting.MakePodSet("main", 1). + Image("img"). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq1"). + Assignment(corev1.ResourceCPU, "f1", "1"). + Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }) + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "1", "0").Obj()) + + podCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + + wlCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.IgnoreFields(metav1.Condition{}, "ObservedGeneration", "LastTransitionTime"), + } + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + + wantPods []corev1.Pod + wantWorkloads []kueue.Workload + wantError error + }{ + + "create one": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Obj(), + }, + + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone().Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + + builder := utiltesting.NewClientBuilder(). + WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}).WithStatusSubresource(&kueue.Workload{}). + WithLists(&podsList, &cqList, &lqList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadMappingCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + gotErr := Import(ctx, client, mpc) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + + err := client.List(ctx, &podsList) + if err != nil { + t.Errorf("Unexpected list pod error: %s", err) + } + if diff := cmp.Diff(tc.wantPods, podsList.Items, podCmpOpts...); diff != "" { + t.Errorf("Unexpected pods (-want/+got)\n%s", diff) + } + + wlList := kueue.WorkloadList{} + err = client.List(ctx, &wlList) + if err != nil { + t.Errorf("Unexpected list workloads error: %s", err) + } + if diff := cmp.Diff(tc.wantWorkloads, wlList.Items, wlCmpOpts...); diff != "" { + t.Errorf("Unexpected workloads (-want/+got)\n%s", diff) + } + + }) + } +} diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go index 740c40e5c7..9d9df0d2fc 100644 --- a/cmd/importer/util/util.go +++ b/cmd/importer/util/util.go @@ -18,6 +18,7 @@ package util import ( "context" + "errors" "fmt" "maps" "slices" @@ -37,6 +38,13 @@ const ( ConcurentProcessors = 8 ) +var ( + ErrNoMappingKey = errors.New("no mapping key found") + ErrNoMapping = errors.New("no mapping found") + ErrLQNotFound = errors.New("localqueue not found") + ErrCQNotFound = errors.New("clusterqueue not found") +) + func ListOptions(namespace, queueLabel, continueToken string) []client.ListOption { opts := []client.ListOption{ client.InNamespace(namespace), @@ -81,56 +89,114 @@ func LoadMappingCache(ctx context.Context, c client.Client, namespaces []string, return &ret, nil } +func (mappingCache *MappingCache) LocalQueue(p *corev1.Pod) (*kueue.LocalQueue, error) { + mappingKey, found := p.Labels[mappingCache.QueueLabel] + if !found { + return nil, ErrNoMappingKey + } + + queueName, found := mappingCache.Mapping[mappingKey] + if !found { + return nil, fmt.Errorf("%s: %w", mappingKey, ErrNoMapping) + } + + nqQueues, found := mappingCache.LocalQueues[p.Namespace] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + + lq, found := nqQueues[queueName] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + return lq, nil +} + +func (mappingCache *MappingCache) ClusterQueue(p *corev1.Pod) (*kueue.ClusterQueue, error) { + lq, err := mappingCache.LocalQueue(p) + if err != nil { + return nil, err + } + queueName := string(lq.Spec.ClusterQueue) + cq, found := mappingCache.ClusterQueues[queueName] + if !found { + return nil, fmt.Errorf("cluster queue: %s: %w", queueName, ErrCQNotFound) + } + return cq, nil +} + func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLabel string, ch chan<- corev1.Pod) error { defer close(ch) for _, ns := range namespaces { + lst := &corev1.PodList{} + log := ctrl.LoggerFrom(ctx).WithValues("namespace", ns) + log.Info("Start") + page := 0 for { - log := ctrl.LoggerFrom(ctx).WithValues("namespace", ns) - log.V(3).Info("Start") - lst := &corev1.PodList{} err := c.List(ctx, lst, ListOptions(ns, queueLabel, lst.Continue)...) if err != nil { log.V(5).Error(err, "list") - return fmt.Errorf("loading local queues in namespace %s: %w", ns, err) + return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err) } for _, p := range lst.Items { ch <- p } - if len(lst.Items) < ListLength { + if lst.Continue == "" { log.V(2).Info("No more items") break } + page++ } } return nil } -func ConcurrentProcessPod(ch <-chan corev1.Pod, f func(p *corev1.Pod) error) []error { - var errs []error +type Result struct { + Pod string + Err error +} + +type ProcessSummary struct { + TotalPods int + FailedPods int + ErrorsForPods map[string][]string + Errors []error +} + +func ConcurrentProcessPod(ch <-chan corev1.Pod, f func(p *corev1.Pod) error) ProcessSummary { wg := sync.WaitGroup{} - errCh := make(chan error) + resultCh := make(chan Result) wg.Add(ConcurentProcessors) for i := 0; i < ConcurentProcessors; i++ { go func() { defer wg.Done() for pod := range ch { - if err := f(&pod); err != nil { - errCh <- err - - } + err := f(&pod) + resultCh <- Result{Pod: client.ObjectKeyFromObject(&pod).String(), Err: err} } }() } go func() { wg.Wait() - close(errCh) + close(resultCh) }() - for err := range errCh { - errs = append(errs, err) + ps := ProcessSummary{ + ErrorsForPods: make(map[string][]string), + } + for result := range resultCh { + ps.TotalPods++ + if result.Err != nil { + ps.FailedPods++ + estr := result.Err.Error() + if _, found := ps.ErrorsForPods[estr]; !found { + ps.Errors = append(ps.Errors, result.Err) + } + ps.ErrorsForPods[estr] = append(ps.ErrorsForPods[estr], result.Pod) + } } - return errs + return ps } diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index f5c79a33b9..a15c00566d 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -138,7 +138,7 @@ var ( _ jobframework.ComposableJob = (*Pod)(nil) ) -func fromObject(o runtime.Object) *Pod { +func FromObject(o runtime.Object) *Pod { out := Pod{} out.pod = *o.(*corev1.Pod) return &out @@ -423,7 +423,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, if !podsInGroup[i].DeletionTimestamp.IsZero() || (stopReason != jobframework.StopReasonWorkloadDeleted && podSuspended(&podsInGroup[i])) { continue } - podInGroup := fromObject(&podsInGroup[i]) + podInGroup := FromObject(&podsInGroup[i]) // The podset info is not relevant here, since this should mark the pod's end of life pCopy := &corev1.Pod{ @@ -648,7 +648,7 @@ func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) { } if !podRoleFound { - podSet := fromObject(&podInGroup).PodSets() + podSet := FromObject(&podInGroup).PodSets() podSet[0].Name = roleHash resultPodSets = append(resultPodSets, podSet[0]) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index c268f46838..640d7e273c 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -74,7 +74,7 @@ func TestPodsReady(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(tc.pod) + pod := FromObject(tc.pod) got := pod.PodsReady() if tc.want != got { t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) @@ -98,7 +98,7 @@ func TestRun(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(&tc.pods[0]) + pod := FromObject(&tc.pods[0]) ctx, _ := utiltesting.ContextWithLog(t) clientBuilder := utiltesting.NewClientBuilder() @@ -3554,9 +3554,9 @@ func TestIsPodOwnerManagedByQueue(t *testing.T) { pod.OwnerReferences = append(pod.OwnerReferences, tc.ownerReference) - if tc.wantRes != IsPodOwnerManagedByKueue(fromObject(pod)) { + if tc.wantRes != IsPodOwnerManagedByKueue(FromObject(pod)) { t.Errorf("Unexpected 'IsPodOwnerManagedByKueue' result\n want: %t\n got: %t)", - tc.wantRes, IsPodOwnerManagedByKueue(fromObject(pod))) + tc.wantRes, IsPodOwnerManagedByKueue(FromObject(pod))) } }) } diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index b81ae06563..ecee0c6a90 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -133,7 +133,7 @@ func (p *Pod) addRoleHash() error { } func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Applying defaults") @@ -201,7 +201,7 @@ var _ webhook.CustomValidator = &PodWebhook{} func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Validating create") allErrs := jobframework.ValidateCreateForQueueName(pod) @@ -220,8 +220,8 @@ func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (ad func (w *PodWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - oldPod := fromObject(oldObj) - newPod := fromObject(newObj) + oldPod := FromObject(oldObj) + newPod := FromObject(newObj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&newPod.pod)) log.V(5).Info("Validating update") allErrs := jobframework.ValidateUpdateForQueueName(oldPod, newPod)