From c86a3de8be4998918e78b437131a73c65a07ba02 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 7 Mar 2024 14:26:25 +0200 Subject: [PATCH] [cmd/importer] Initial implementation. --- cmd/importer/main.go | 183 +++++++++++++++++ cmd/importer/pod/check.go | 69 +++++++ cmd/importer/pod/check_test.go | 137 +++++++++++++ cmd/importer/pod/import.go | 207 +++++++++++++++++++ cmd/importer/pod/import_test.go | 162 +++++++++++++++ cmd/importer/util/util.go | 220 +++++++++++++++++++++ go.mod | 4 +- test/integration/framework/framework.go | 15 +- test/integration/importer/importer_test.go | 155 +++++++++++++++ test/integration/importer/suite_test.go | 102 ++++++++++ 10 files changed, 1249 insertions(+), 5 deletions(-) create mode 100644 cmd/importer/main.go create mode 100644 cmd/importer/pod/check.go create mode 100644 cmd/importer/pod/check_test.go create mode 100644 cmd/importer/pod/import.go create mode 100644 cmd/importer/pod/import_test.go create mode 100644 cmd/importer/util/util.go create mode 100644 test/integration/importer/importer_test.go create mode 100644 test/integration/importer/suite_test.go diff --git a/cmd/importer/main.go b/cmd/importer/main.go new file mode 100644 index 0000000000..92fe6ed27d --- /dev/null +++ b/cmd/importer/main.go @@ -0,0 +1,183 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "go.uber.org/zap/zapcore" + "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/pod" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/util/useragent" +) + +const ( + NamespaceFlag = "namespace" + NamespaceFlagShort = "n" + QueueMappingFlag = "queuemapping" + QueueLabelFlag = "queuelabel" + QPSFlag = "qps" + BurstFlag = "burst" + VerbosityFlag = "verbose" + VerboseFlagShort = "v" + JobsFlag = "jobs" + JobsFlagShort = "j" +) + +var ( + k8sClient client.Client + rootCmd = &cobra.Command{ + Use: "importer", + Short: "Import existing (running) objects into Kueue", + PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { + v, _ := cmd.Flags().GetCount(VerbosityFlag) + level := (v + 1) * -1 + ctrl.SetLogger(zap.New( + zap.UseDevMode(true), + zap.ConsoleEncoder(), + zap.Level(zapcore.Level(level)), + )) + + kubeConfig, err := ctrl.GetConfig() + if err != nil { + return err + } + if kubeConfig.UserAgent == "" { + kubeConfig.UserAgent = useragent.Default() + } + qps, err := cmd.Flags().GetFloat32(QPSFlag) + if err != nil { + return err + } + kubeConfig.QPS = qps + bust, err := cmd.Flags().GetInt(BurstFlag) + if err != nil { + return err + } + kubeConfig.Burst = bust + + if err := kueue.AddToScheme(scheme.Scheme); err != nil { + return err + } + + c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return err + } + k8sClient = c + return nil + }, + RunE: func(cmd *cobra.Command, args []string) error { + cmd.Flags().Visit(func(f *pflag.Flag) { + fmt.Println(f.Name, f.Value.String()) + }) + err := checkCmd(cmd, args) + if err != nil { + return err + } + return nil + }, + } +) + +func init() { + rootCmd.PersistentFlags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") + rootCmd.MarkPersistentFlagRequired(NamespaceFlag) + + rootCmd.PersistentFlags().String(QueueLabelFlag, "", "label used to identify the target local queue") + rootCmd.PersistentFlags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") + rootCmd.MarkPersistentFlagRequired(QueueLabelFlag) + + rootCmd.PersistentFlags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + rootCmd.PersistentFlags().Int(BurstFlag, 50, "client Burst, https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + rootCmd.PersistentFlags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent jobs") + + rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") + + rootCmd.AddCommand( + &cobra.Command{ + Use: "check", + RunE: checkCmd, + }, + &cobra.Command{ + Use: "import", + RunE: importCmd, + }, + ) +} + +func main() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func loadMappingCache(ctx context.Context, cmd *cobra.Command) (*util.ImportCache, error) { + flags := cmd.Flags() + namespaces, err := flags.GetStringSlice(NamespaceFlag) + if err != nil { + return nil, err + } + queueLabel, err := flags.GetString(QueueLabelFlag) + if err != nil { + return nil, err + } + mapping, err := flags.GetStringToString(QueueMappingFlag) + if err != nil { + return nil, err + } + return util.LoadImportCache(ctx, k8sClient, namespaces, queueLabel, mapping) +} + +func checkCmd(cmd *cobra.Command, _ []string) error { + log := ctrl.Log.WithName("check") + ctx := ctrl.LoggerInto(context.Background(), log) + jobs, _ := cmd.Flags().GetUint(JobsFlag) + cache, err := loadMappingCache(ctx, cmd) + if err != nil { + return err + } + + return pod.Check(ctx, k8sClient, cache, jobs) +} + +func importCmd(cmd *cobra.Command, _ []string) error { + log := ctrl.Log.WithName("import") + ctx := ctrl.LoggerInto(context.Background(), log) + jobs, _ := cmd.Flags().GetUint(JobsFlag) + cache, err := loadMappingCache(ctx, cmd) + if err != nil { + return err + } + + if err = pod.Check(ctx, k8sClient, cache, jobs); err != nil { + return err + } + + return pod.Import(ctx, k8sClient, cache, jobs) +} diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go new file mode 100644 index 0000000000..2196302ecf --- /dev/null +++ b/cmd/importer/pod/check.go @@ -0,0 +1,69 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/kueue/cmd/importer/util" +) + +func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Checking") + + cq, err := cache.ClusterQueue(p) + if err != nil { + return err + } + + if len(cq.Spec.ResourceGroups) == 0 { + return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) + } + + if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { + return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) + } + + rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) + if _, found := cache.ResourceFalvors[rf]; !found { + return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid) + } + + // do some additional checks like: + // - (maybe) the resources managed by the queues + + return nil + }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0]) + } + return errors.Join(summary.Errors...) +} diff --git a/cmd/importer/pod/check_test.go b/cmd/importer/pod/check_test.go new file mode 100644 index 0000000000..975842e25f --- /dev/null +++ b/cmd/importer/pod/check_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +const ( + testingNamespace = "ns" + testingQueueLabel = "testing.lbl" +) + +func TestCheckNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + Label(testingQueueLabel, "q1") + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1") + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + flavors []kueue.ResourceFlavor + + wantError error + }{ + "empty cluster": {}, + "no mapping": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + wantError: util.ErrNoMapping, + }, + "no local queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + wantError: util.ErrLQNotFound, + }, + "no cluster queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + wantError: util.ErrCQNotFound, + }, + "invalid cq": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + wantError: util.ErrCQInvalid, + }, + "all found": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("cq1").ResourceGroup(*utiltesting.MakeFlavorQuotas("rf1").Resource(corev1.ResourceCPU, "1").Obj()).Obj(), + }, + flavors: []kueue.ResourceFlavor{ + *utiltesting.MakeResourceFlavor("rf1").Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + rfList := kueue.ResourceFlavorList{Items: tc.flavors} + + builder := utiltesting.NewClientBuilder() + builder = builder.WithLists(&podsList, &cqList, &lqList, &rfList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + gotErr := Check(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + }) + } +} diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go new file mode 100644 index 0000000000..24d271d771 --- /dev/null +++ b/cmd/importer/pod/import.go @@ -0,0 +1,207 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/workload" +) + +func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Importing") + + lq, err := cache.LocalQueue(p) + if err != nil { + return err + } + + oldLq, found := p.Labels[controllerconstants.QueueLabel] + if !found { + if err := addLabels(ctx, c, p, lq.Name); err != nil { + return fmt.Errorf("cannot add queue label: %w", err) + } + } else if oldLq != lq.Name { + return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name) + } + + kp := pod.FromObject(p) + // Note: the recorder is not used for single pods, we can just pass nil for now. + wl, err := kp.ConstructComposableWorkload(ctx, c, nil) + if err != nil { + return fmt.Errorf("construct workload: %w", err) + } + + if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found { + wl.Spec.PriorityClassName = pc.Name + wl.Spec.Priority = &pc.Value + wl.Spec.PriorityClassSource = constants.PodPriorityClassSource + } + + if err := createWorkload(ctx, c, wl); err != nil { + return fmt.Errorf("creating workload: %w", err) + + } + + //make its admission and update its status + + info := workload.NewInfo(wl) + cq := cache.ClusterQueues[string(lq.Spec.ClusterQueue)] + admission := kueue.Admission{ + ClusterQueue: kueue.ClusterQueueReference(cq.Name), + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: info.TotalRequests[0].Name, + Flavors: make(map[corev1.ResourceName]kueue.ResourceFlavorReference), + ResourceUsage: info.TotalRequests[0].Requests.ToResourceList(), + Count: ptr.To[int32](1), + }, + }, + } + flv := cq.Spec.ResourceGroups[0].Flavors[0].Name + for r := range info.TotalRequests[0].Requests { + admission.PodSetAssignments[0].Flavors[r] = flv + } + + wl.Status.Admission = &admission + reservedCond := metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, reservedCond) + admittedCond := metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond) + return admitWorkload(ctx, c, wl) + }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods) + } + return errors.Join(summary.Errors...) +} + +func checkError(err error) (retry, reload bool, timeout time.Duration) { + retry_seconds, retry := apierrors.SuggestsClientDelay(err) + if retry { + return true, false, time.Duration(retry_seconds) * time.Second + } + + if apierrors.IsConflict(err) { + return true, true, 0 + } + return false, false, 0 +} + +func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string) error { + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + + err := c.Update(ctx, p) + retry, reload, timeouut := checkError(err) + + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + if reload { + err = c.Get(ctx, client.ObjectKeyFromObject(p), p) + if err != nil { + retry, reload, timeouut = checkError(err) + continue + } + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + } + err = c.Update(ctx, p) + retry, reload, timeouut = checkError(err) + } + return err +} + +func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := c.Create(ctx, wl) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = c.Create(ctx, wl) + retry, _, timeouut = checkError(err) + } + return err +} + +func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := workload.ApplyAdmissionStatus(ctx, c, wl, false) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = workload.ApplyAdmissionStatus(ctx, c, wl, false) + retry, _, timeouut = checkError(err) + } + return err +} diff --git a/cmd/importer/pod/import_test.go b/cmd/importer/pod/import_test.go new file mode 100644 index 0000000000..ed77519e41 --- /dev/null +++ b/cmd/importer/pod/import_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestImportNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + UID("pod"). + Label(testingQueueLabel, "q1"). + Image("img", nil). + Request(corev1.ResourceCPU, "1") + + baseWlWrapper := utiltesting.MakeWorkload("pod-pod-b17ab", testingNamespace). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "pod"). + Label(constants.JobUIDLabel, "pod"). + Finalizers(kueue.ResourceInUseFinalizerName). + Queue("lq1"). + PodSets(*utiltesting.MakePodSet("main", 1). + Image("img"). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq1"). + Assignment(corev1.ResourceCPU, "f1", "1"). + Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }) + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "1", "0").Obj()) + + podCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + + wlCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.IgnoreFields(metav1.Condition{}, "ObservedGeneration", "LastTransitionTime"), + } + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + + wantPods []corev1.Pod + wantWorkloads []kueue.Workload + wantError error + }{ + + "create one": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Obj(), + }, + + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone().Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + + builder := utiltesting.NewClientBuilder(). + WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}).WithStatusSubresource(&kueue.Workload{}). + WithLists(&podsList, &cqList, &lqList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + gotErr := Import(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + + err := client.List(ctx, &podsList) + if err != nil { + t.Errorf("Unexpected list pod error: %s", err) + } + if diff := cmp.Diff(tc.wantPods, podsList.Items, podCmpOpts...); diff != "" { + t.Errorf("Unexpected pods (-want/+got)\n%s", diff) + } + + wlList := kueue.WorkloadList{} + err = client.List(ctx, &wlList) + if err != nil { + t.Errorf("Unexpected list workloads error: %s", err) + } + if diff := cmp.Diff(tc.wantWorkloads, wlList.Items, wlCmpOpts...); diff != "" { + t.Errorf("Unexpected workloads (-want/+got)\n%s", diff) + } + + }) + } +} diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go new file mode 100644 index 0000000000..c896071495 --- /dev/null +++ b/cmd/importer/util/util.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "errors" + "fmt" + "maps" + "slices" + "sync" + + corev1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" +) + +const ( + ListLength = 100 +) + +var ( + ErrNoMappingKey = errors.New("no mapping key found") + ErrNoMapping = errors.New("no mapping found") + ErrLQNotFound = errors.New("localqueue not found") + ErrCQNotFound = errors.New("clusterqueue not found") + ErrCQInvalid = errors.New("clusterqueue invalid") +) + +func ListOptions(namespace, queueLabel, continueToken string) []client.ListOption { + opts := []client.ListOption{ + client.InNamespace(namespace), + client.HasLabels{queueLabel}, + client.Limit(ListLength), + client.Continue(continueToken), + } + return opts +} + +type ImportCache struct { + Namespaces []string + QueueLabel string + Mapping map[string]string + LocalQueues map[string]map[string]*kueue.LocalQueue + ClusterQueues map[string]*kueue.ClusterQueue + ResourceFalvors map[string]*kueue.ResourceFlavor + PriorityClasses map[string]*schedulingv1.PriorityClass +} + +func LoadImportCache(ctx context.Context, c client.Client, namespaces []string, queueLabel string, mapping map[string]string) (*ImportCache, error) { + ret := ImportCache{ + Namespaces: slices.Clone(namespaces), + QueueLabel: queueLabel, + Mapping: maps.Clone(mapping), + LocalQueues: make(map[string]map[string]*kueue.LocalQueue), + } + + // get the cluster queues + cqList := &kueue.ClusterQueueList{} + if err := c.List(ctx, cqList); err != nil { + return nil, fmt.Errorf("loading cluster queues: %w", err) + } + ret.ClusterQueues = utilslices.ToRefMap(cqList.Items, func(cq *kueue.ClusterQueue) string { return cq.Name }) + + // get the local queues + for _, ns := range namespaces { + lqList := &kueue.LocalQueueList{} + if err := c.List(ctx, lqList); err != nil { + return nil, fmt.Errorf("loading local queues in namespace %s: %w", ns, err) + } + ret.LocalQueues[ns] = utilslices.ToRefMap(lqList.Items, func(lq *kueue.LocalQueue) string { return lq.Name }) + } + + // ResourceFlavors + rfList := &kueue.ResourceFlavorList{} + if err := c.List(ctx, rfList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.ResourceFalvors = utilslices.ToRefMap(rfList.Items, func(rf *kueue.ResourceFlavor) string { return rf.Name }) + + // PriorityClasses + pcList := &schedulingv1.PriorityClassList{} + if err := c.List(ctx, pcList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.PriorityClasses = utilslices.ToRefMap(pcList.Items, func(pc *schedulingv1.PriorityClass) string { return pc.Name }) + return &ret, nil +} + +func (mappingCache *ImportCache) LocalQueue(p *corev1.Pod) (*kueue.LocalQueue, error) { + mappingKey, found := p.Labels[mappingCache.QueueLabel] + if !found { + return nil, ErrNoMappingKey + } + + queueName, found := mappingCache.Mapping[mappingKey] + if !found { + return nil, fmt.Errorf("%s: %w", mappingKey, ErrNoMapping) + } + + nqQueues, found := mappingCache.LocalQueues[p.Namespace] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + + lq, found := nqQueues[queueName] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + return lq, nil +} + +func (mappingCache *ImportCache) ClusterQueue(p *corev1.Pod) (*kueue.ClusterQueue, error) { + lq, err := mappingCache.LocalQueue(p) + if err != nil { + return nil, err + } + queueName := string(lq.Spec.ClusterQueue) + cq, found := mappingCache.ClusterQueues[queueName] + if !found { + return nil, fmt.Errorf("cluster queue: %s: %w", queueName, ErrCQNotFound) + } + return cq, nil +} + +func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLabel string, ch chan<- corev1.Pod) error { + defer close(ch) + for _, ns := range namespaces { + lst := &corev1.PodList{} + log := ctrl.LoggerFrom(ctx).WithValues("namespace", ns) + log.V(3).Info("Begin pods list") + defer log.V(3).Info("End pods list") + page := 0 + for { + err := c.List(ctx, lst, ListOptions(ns, queueLabel, lst.Continue)...) + if err != nil { + log.Error(err, "list") + return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err) + } + + for _, p := range lst.Items { + // ignore deleted or in final Phase? + ch <- p + } + + page++ + if lst.Continue == "" { + log.V(2).Info("No more pods", "pages", page) + break + } + } + } + return nil +} + +type Result struct { + Pod string + Err error +} + +type ProcessSummary struct { + TotalPods int + FailedPods int + ErrorsForPods map[string][]string + Errors []error +} + +func ConcurrentProcessPod(ch <-chan corev1.Pod, jobs uint, f func(p *corev1.Pod) error) ProcessSummary { + wg := sync.WaitGroup{} + resultCh := make(chan Result) + + wg.Add(int(jobs)) + for i := 0; i < int(jobs); i++ { + go func() { + defer wg.Done() + for pod := range ch { + err := f(&pod) + resultCh <- Result{Pod: client.ObjectKeyFromObject(&pod).String(), Err: err} + } + }() + } + go func() { + wg.Wait() + close(resultCh) + }() + + ps := ProcessSummary{ + ErrorsForPods: make(map[string][]string), + } + for result := range resultCh { + ps.TotalPods++ + if result.Err != nil { + ps.FailedPods++ + estr := result.Err.Error() + if _, found := ps.ErrorsForPods[estr]; !found { + ps.Errors = append(ps.Errors, result.Err) + } + ps.ErrorsForPods[estr] = append(ps.ErrorsForPods[estr], result.Pod) + } + } + return ps +} diff --git a/go.mod b/go.mod index 3c99c817c3..1443e10f02 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.6.0 github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0 + github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 go.uber.org/zap v1.27.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 @@ -82,8 +84,6 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/spf13/cobra v1.8.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect go.etcd.io/etcd/api/v3 v3.5.11 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.11 // indirect diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 92ccc216f1..d465781df9 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -85,7 +85,7 @@ func (f *Framework) Init() *rest.Config { return cfg } -func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { +func (f *Framework) SetupClient(cfg *rest.Config) (context.Context, client.Client) { err := config.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) @@ -116,6 +116,13 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) gomega.ExpectWithOffset(1, k8sClient).NotTo(gomega.BeNil()) + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + return ctx, k8sClient +} + +func (f *Framework) StartManager(ctx context.Context, cfg *rest.Config, managerSetup ManagerSetup) { webhookInstallOptions := &f.testEnv.WebhookInstallOptions mgrOpts := manager.Options{ Scheme: scheme.Scheme, @@ -132,8 +139,6 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con mgr, err := ctrl.NewManager(cfg, mgrOpts) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager") - ctx, cancel := context.WithCancel(context.Background()) - f.cancel = cancel managerSetup(mgr, ctx) go func() { @@ -155,7 +160,11 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con return nil }).Should(gomega.Succeed()) } +} +func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { + ctx, k8sClient := f.SetupClient(cfg) + f.StartManager(ctx, cfg, managerSetup) return ctx, k8sClient } diff --git a/test/integration/importer/importer_test.go b/test/integration/importer/importer_test.go new file mode 100644 index 0000000000..4986ff5dfe --- /dev/null +++ b/test/integration/importer/importer_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "sigs.k8s.io/controller-runtime/pkg/client" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + importerpod "sigs.k8s.io/kueue/cmd/importer/pod" + importerutil "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/metrics" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + utiltestingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("Importer", func() { + var ( + ns *corev1.Namespace + flavor *kueue.ResourceFlavor + cq *kueue.ClusterQueue + lq *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "import-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + flavor = utiltesting.MakeResourceFlavor("f1").Obj() + gomega.Expect(k8sClient.Create(ctx, flavor)).To(gomega.Succeed()) + + cq = utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "4").Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) + + lq = utiltesting.MakeLocalQueue("lq1", ns.Name).ClusterQueue("cq1").Obj() + gomega.Expect(k8sClient.Create(ctx, lq)).To(gomega.Succeed()) + + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor, true) + }) + + ginkgo.When("Kueue is started after import", func() { + ginkgo.It("Should keep the imported pods admitted", func() { + + pod1 := utiltestingpod.MakePod("pod1", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + pod2 := utiltestingpod.MakePod("pod2", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + + ginkgo.By("Creating the initial pods", func() { + gomega.Expect(k8sClient.Create(ctx, pod1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, pod2)).To(gomega.Succeed()) + }) + + ginkgo.By("Running the import", func() { + mapping, err := importerutil.LoadImportCache(ctx, k8sClient, []string{ns.Name}, "src.lbl", map[string]string{"src-val": "lq1"}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(mapping).ToNot(gomega.BeNil()) + + gomega.Expect(importerpod.Check(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + gomega.Expect(importerpod.Import(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + }) + + wl1LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod1.Name, pod1.UID), Namespace: ns.Name} + wl2LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod2.Name, pod2.UID), Namespace: ns.Name} + wl1 := &kueue.Workload{} + wl2 := &kueue.Workload{} + + ginkgo.By("Checking the Workloads are created and admitted", func() { + gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + ginkgo.By("Starting kueue, the cluster queue status should account for the imported Workloads", func() { + fwk.StartManager(ctx, cfg, managerAndSchedulerSetup) + + util.ExpectClusterQueueStatusMetric(cq, metrics.CQStatusActive) + gomega.Eventually(func(g gomega.Gomega) { + updatedQueue := &kueue.ClusterQueue{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), updatedQueue)).To(gomega.Succeed()) + g.Expect(updatedQueue.Status.AdmittedWorkloads).To(gomega.Equal(int32(2))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + pod3 := utiltestingpod.MakePod("pod3", ns.Name). + Queue("lq1"). + Label("kueue.x-k8s.io/managed", "true"). + Request(corev1.ResourceCPU, "2"). + KueueSchedulingGate(). + Obj() + + ginkgo.By("Creating a new pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod3)).To(gomega.Succeed()) + }) + + wl3LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod3.Name, pod3.UID), Namespace: ns.Name} + wl3 := &kueue.Workload{} + + ginkgo.By("Checking the Workload is created and pending while the old ones remain admitted", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wl3LookupKey, wl3)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + util.ExpectWorkloadsToBePending(ctx, k8sClient, wl3) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + ginkgo.By("By finishing an imported pod, the new one's Workload should be admitted", func() { + util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pod2) + + util.ExpectWorkloadToFinish(ctx, k8sClient, wl2LookupKey) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl3) + }) + }) + }) + +}) diff --git a/test/integration/importer/suite_test.go b/test/integration/importer/suite_test.go new file mode 100644 index 0000000000..2d15c1b58d --- /dev/null +++ b/test/integration/importer/suite_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + // +kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework +) + +func TestScheduler(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "Importer Suite", + ) +} + +var _ = ginkgo.BeforeSuite(func() { + fwk = &framework.Framework{ + CRDPath: filepath.Join("..", "..", "..", "config", "components", "crd", "bases"), + } + cfg = fwk.Init() + ctx, k8sClient = fwk.SetupClient(cfg) +}) + +var _ = ginkgo.AfterSuite(func() { + fwk.Teardown() +}) + +func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + podReconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = podReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +}