From 45458ceb16b78fc4f111b99340b557eb20bbf3d1 Mon Sep 17 00:00:00 2001 From: Traian Schiau <55734665+trasc@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:05:53 +0200 Subject: [PATCH] Add pod importer (#1825) * [pod] Export `FromObject()` * [cmd/importer] Initial implementation. * Review Remarks * Add extra labels * Review Remarks * Review Remarks * Review Remarks * Review Remarks --- cmd/importer/README.md | 42 ++++ cmd/importer/main.go | 221 +++++++++++++++++ cmd/importer/pod/check.go | 73 ++++++ cmd/importer/pod/check_test.go | 137 +++++++++++ cmd/importer/pod/import.go | 218 +++++++++++++++++ cmd/importer/pod/import_test.go | 194 +++++++++++++++ cmd/importer/util/util.go | 226 ++++++++++++++++++ go.mod | 4 +- pkg/controller/jobs/pod/pod_controller.go | 6 +- .../jobs/pod/pod_controller_test.go | 8 +- pkg/controller/jobs/pod/pod_webhook.go | 8 +- test/integration/framework/framework.go | 15 +- test/integration/importer/importer_test.go | 165 +++++++++++++ test/integration/importer/suite_test.go | 102 ++++++++ 14 files changed, 1403 insertions(+), 16 deletions(-) create mode 100644 cmd/importer/README.md create mode 100644 cmd/importer/main.go create mode 100644 cmd/importer/pod/check.go create mode 100644 cmd/importer/pod/check_test.go create mode 100644 cmd/importer/pod/import.go create mode 100644 cmd/importer/pod/import_test.go create mode 100644 cmd/importer/util/util.go create mode 100644 test/integration/importer/importer_test.go create mode 100644 test/integration/importer/suite_test.go diff --git a/cmd/importer/README.md b/cmd/importer/README.md new file mode 100644 index 0000000000..5667392e5e --- /dev/null +++ b/cmd/importer/README.md @@ -0,0 +1,42 @@ +# Kueue Importer Tool + +A tool able to import existing pods into kueue. + +## Cluster setup. + +The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details. + +For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. + +## Build + +From kueue source root run: + ```bash +go build -C cmd/importer/ -o $(pwd)/bin/importer + + ``` + +## Usage + +The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. + +The importer will perform following checks: + +- At least one `namespace` is provided. +- The label key (`queuelabel`) providing the queue mapping is provided. +- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists. +- The LocalQueues involved in the import are using an existing ClusterQueue. +- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. + +After which, if `--dry-run=false` was specified, for each selected Pod the importer will: + +- Update the Pod's Kueue related labels. +- Create a Workload associated with the Pod. +- Admit the Workload. + +### Example + +```bash +./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false +``` + Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value. diff --git a/cmd/importer/main.go b/cmd/importer/main.go new file mode 100644 index 0000000000..ef44dd2258 --- /dev/null +++ b/cmd/importer/main.go @@ -0,0 +1,221 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "errors" + "fmt" + "maps" + "os" + + "github.com/spf13/cobra" + "go.uber.org/zap/zapcore" + "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/pod" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/util/useragent" +) + +const ( + NamespaceFlag = "namespace" + NamespaceFlagShort = "n" + QueueMappingFlag = "queuemapping" + QueueMappingFileFlag = "queuemapping-file" + QueueLabelFlag = "queuelabel" + QPSFlag = "qps" + BurstFlag = "burst" + VerbosityFlag = "verbose" + VerboseFlagShort = "v" + ConcurrencyFlag = "concurrent-workers" + ConcurrencyFlagShort = "c" + DryRunFlag = "dry-run" + AddLabelsFlag = "add-labels" +) + +var ( + rootCmd = &cobra.Command{ + Use: "importer", + Short: "Import existing (running) objects into Kueue", + PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { + v, _ := cmd.Flags().GetCount(VerbosityFlag) + level := (v + 1) * -1 + ctrl.SetLogger(zap.New( + zap.UseDevMode(true), + zap.ConsoleEncoder(), + zap.Level(zapcore.Level(level)), + )) + return nil + }, + } +) + +func setFlags(cmd *cobra.Command) { + cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") + cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") + cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") + cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads") + cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") + cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers") + cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") + + _ = cmd.MarkFlagRequired(QueueLabelFlag) + _ = cmd.MarkFlagRequired(NamespaceFlag) +} + +func init() { + rootCmd.AddGroup(&cobra.Group{ + ID: "pod", + Title: "Pods import", + }) + rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") + + importCmd := &cobra.Command{ + Use: "import", + GroupID: "pod", + Short: "Checks the prerequisites and import pods.", + RunE: importCmd, + } + setFlags(importCmd) + rootCmd.AddCommand(importCmd) +} + +func main() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) { + flags := cmd.Flags() + namespaces, err := flags.GetStringSlice(NamespaceFlag) + if err != nil { + return nil, err + } + queueLabel, err := flags.GetString(QueueLabelFlag) + if err != nil { + return nil, err + } + + mapping, err := flags.GetStringToString(QueueMappingFlag) + if err != nil { + return nil, err + } + + mappingFile, err := flags.GetString(QueueMappingFileFlag) + if err != nil { + return nil, err + } + + if mappingFile != "" { + yamlFile, err := os.ReadFile(mappingFile) + if err != nil { + return nil, err + } + extraMapping := map[string]string{} + err = yaml.Unmarshal(yamlFile, extraMapping) + if err != nil { + return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) + } + maps.Copy(mapping, extraMapping) + } + + addLabels, err := flags.GetStringToString(AddLabelsFlag) + if err != nil { + return nil, err + } + + var validationErrors []error + for name, value := range addLabels { + for _, err := range validation.IsQualifiedName(name) { + validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err)) + } + for _, err := range validation.IsValidLabelValue(value) { + validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err)) + } + } + if len(validationErrors) > 0 { + return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...)) + } + + return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels) +} + +func getKubeClient(cmd *cobra.Command) (client.Client, error) { + kubeConfig, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + if kubeConfig.UserAgent == "" { + kubeConfig.UserAgent = useragent.Default() + } + qps, err := cmd.Flags().GetFloat32(QPSFlag) + if err != nil { + return nil, err + } + kubeConfig.QPS = qps + bust, err := cmd.Flags().GetInt(BurstFlag) + if err != nil { + return nil, err + } + kubeConfig.Burst = bust + + if err := kueue.AddToScheme(scheme.Scheme); err != nil { + return nil, err + } + + c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, err + } + return c, nil +} + +func importCmd(cmd *cobra.Command, _ []string) error { + log := ctrl.Log.WithName("import") + ctx := ctrl.LoggerInto(context.Background(), log) + cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag) + c, err := getKubeClient(cmd) + if err != nil { + return err + } + + cache, err := loadMappingCache(ctx, c, cmd) + if err != nil { + return err + } + + if err = pod.Check(ctx, c, cache, cWorkers); err != nil { + return err + } + + if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr { + fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) + return nil + } + return pod.Import(ctx, c, cache, cWorkers) +} diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go new file mode 100644 index 0000000000..e782d413bd --- /dev/null +++ b/cmd/importer/pod/check.go @@ -0,0 +1,73 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/kueue/cmd/importer/util" +) + +func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go func() { + err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "Listing pods") + } + }() + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Checking") + + cq, err := cache.ClusterQueue(p) + if err != nil { + return err + } + + if len(cq.Spec.ResourceGroups) == 0 { + return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) + } + + if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { + return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) + } + + rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) + rf, rfFound := cache.ResourceFalvors[rfName] + if !rfFound { + return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid) + } + + log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf)) + return nil + }) + + log := ctrl.LoggerFrom(ctx) + log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) + } + return errors.Join(summary.Errors...) +} diff --git a/cmd/importer/pod/check_test.go b/cmd/importer/pod/check_test.go new file mode 100644 index 0000000000..d0e4e39c21 --- /dev/null +++ b/cmd/importer/pod/check_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +const ( + testingNamespace = "ns" + testingQueueLabel = "testing.lbl" +) + +func TestCheckNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + Label(testingQueueLabel, "q1") + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1") + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + flavors []kueue.ResourceFlavor + + wantError error + }{ + "empty cluster": {}, + "no mapping": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + wantError: util.ErrNoMapping, + }, + "no local queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + wantError: util.ErrLQNotFound, + }, + "no cluster queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + wantError: util.ErrCQNotFound, + }, + "invalid cq": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + wantError: util.ErrCQInvalid, + }, + "all found": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("cq1").ResourceGroup(*utiltesting.MakeFlavorQuotas("rf1").Resource(corev1.ResourceCPU, "1").Obj()).Obj(), + }, + flavors: []kueue.ResourceFlavor{ + *utiltesting.MakeResourceFlavor("rf1").Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + rfList := kueue.ResourceFlavorList{Items: tc.flavors} + + builder := utiltesting.NewClientBuilder() + builder = builder.WithLists(&podsList, &cqList, &lqList, &rfList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, nil) + gotErr := Check(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + }) + } +} diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go new file mode 100644 index 0000000000..06f5f73f7c --- /dev/null +++ b/cmd/importer/pod/import.go @@ -0,0 +1,218 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "maps" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/workload" +) + +func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go func() { + err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "Listing pods") + } + }() + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Importing") + + lq, err := cache.LocalQueue(p) + if err != nil { + return err + } + + oldLq, found := p.Labels[controllerconstants.QueueLabel] + if !found { + if err := addLabels(ctx, c, p, lq.Name, cache.AddLabels); err != nil { + return fmt.Errorf("cannot add queue label: %w", err) + } + } else if oldLq != lq.Name { + return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name) + } + + kp := pod.FromObject(p) + // Note: the recorder is not used for single pods, we can just pass nil for now. + wl, err := kp.ConstructComposableWorkload(ctx, c, nil) + if err != nil { + return fmt.Errorf("construct workload: %w", err) + } + + maps.Copy(wl.Labels, cache.AddLabels) + + if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found { + wl.Spec.PriorityClassName = pc.Name + wl.Spec.Priority = &pc.Value + wl.Spec.PriorityClassSource = constants.PodPriorityClassSource + } + + if err := createWorkload(ctx, c, wl); err != nil { + return fmt.Errorf("creating workload: %w", err) + + } + + //make its admission and update its status + + info := workload.NewInfo(wl) + cq := cache.ClusterQueues[string(lq.Spec.ClusterQueue)] + admission := kueue.Admission{ + ClusterQueue: kueue.ClusterQueueReference(cq.Name), + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: info.TotalRequests[0].Name, + Flavors: make(map[corev1.ResourceName]kueue.ResourceFlavorReference), + ResourceUsage: info.TotalRequests[0].Requests.ToResourceList(), + Count: ptr.To[int32](1), + }, + }, + } + flv := cq.Spec.ResourceGroups[0].Flavors[0].Name + for r := range info.TotalRequests[0].Requests { + admission.PodSetAssignments[0].Flavors[r] = flv + } + + wl.Status.Admission = &admission + reservedCond := metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, reservedCond) + admittedCond := metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond) + if err := admitWorkload(ctx, c, wl); err != nil { + return err + } + log.V(2).Info("Successfully imported", "pod", klog.KObj(p), "workload", klog.KObj(wl)) + return nil + }) + + log := ctrl.LoggerFrom(ctx) + log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + log.Info("Import failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) + } + return errors.Join(summary.Errors...) +} + +func checkError(err error) (retry, reload bool, timeout time.Duration) { + retry_seconds, retry := apierrors.SuggestsClientDelay(err) + if retry { + return true, false, time.Duration(retry_seconds) * time.Second + } + + if apierrors.IsConflict(err) { + return true, true, 0 + } + return false, false, 0 +} + +func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string, addLabels map[string]string) error { + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + maps.Copy(p.Labels, addLabels) + + err := c.Update(ctx, p) + retry, reload, timeout := checkError(err) + + for retry { + if timeout >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeout): + } + } + if reload { + err = c.Get(ctx, client.ObjectKeyFromObject(p), p) + if err != nil { + retry, reload, timeout = checkError(err) + continue + } + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + maps.Copy(p.Labels, addLabels) + } + err = c.Update(ctx, p) + retry, reload, timeout = checkError(err) + } + return err +} + +func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := c.Create(ctx, wl) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeout := checkError(err) + for retry { + if timeout >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeout): + } + } + err = c.Create(ctx, wl) + retry, _, timeout = checkError(err) + } + return err +} + +func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := workload.ApplyAdmissionStatus(ctx, c, wl, false) + retry, _, timeout := checkError(err) + for retry { + if timeout >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeout): + } + } + err = workload.ApplyAdmissionStatus(ctx, c, wl, false) + retry, _, timeout = checkError(err) + } + return err +} diff --git a/cmd/importer/pod/import_test.go b/cmd/importer/pod/import_test.go new file mode 100644 index 0000000000..e16970fb21 --- /dev/null +++ b/cmd/importer/pod/import_test.go @@ -0,0 +1,194 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestImportNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + UID("pod"). + Label(testingQueueLabel, "q1"). + Image("img", nil). + Request(corev1.ResourceCPU, "1") + + baseWlWrapper := utiltesting.MakeWorkload("pod-pod-b17ab", testingNamespace). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "pod"). + Label(constants.JobUIDLabel, "pod"). + Finalizers(kueue.ResourceInUseFinalizerName). + Queue("lq1"). + PodSets(*utiltesting.MakePodSet("main", 1). + Image("img"). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq1"). + Assignment(corev1.ResourceCPU, "f1", "1"). + Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }) + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "1", "0").Obj()) + + podCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + + wlCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.IgnoreFields(metav1.Condition{}, "ObservedGeneration", "LastTransitionTime"), + } + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + addLabels map[string]string + + wantPods []corev1.Pod + wantWorkloads []kueue.Workload + wantError error + }{ + + "create one": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Obj(), + }, + + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone().Obj(), + }, + }, + "create one, add labels": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + addLabels: map[string]string{ + "new.lbl": "val", + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Label("new.lbl", "val"). + Obj(), + }, + + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone(). + Label("new.lbl", "val"). + Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + + builder := utiltesting.NewClientBuilder(). + WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}).WithStatusSubresource(&kueue.Workload{}). + WithLists(&podsList, &cqList, &lqList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, tc.addLabels) + gotErr := Import(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + + err := client.List(ctx, &podsList) + if err != nil { + t.Errorf("Unexpected list pod error: %s", err) + } + if diff := cmp.Diff(tc.wantPods, podsList.Items, podCmpOpts...); diff != "" { + t.Errorf("Unexpected pods (-want/+got)\n%s", diff) + } + + wlList := kueue.WorkloadList{} + err = client.List(ctx, &wlList) + if err != nil { + t.Errorf("Unexpected list workloads error: %s", err) + } + if diff := cmp.Diff(tc.wantWorkloads, wlList.Items, wlCmpOpts...); diff != "" { + t.Errorf("Unexpected workloads (-want/+got)\n%s", diff) + } + + }) + } +} diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go new file mode 100644 index 0000000000..ffb4472a4e --- /dev/null +++ b/cmd/importer/util/util.go @@ -0,0 +1,226 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "errors" + "fmt" + "maps" + "slices" + "sync" + + corev1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" +) + +const ( + ListLength = 100 +) + +var ( + ErrNoMappingKey = errors.New("no mapping key found") + ErrNoMapping = errors.New("no mapping found") + ErrLQNotFound = errors.New("localqueue not found") + ErrCQNotFound = errors.New("clusterqueue not found") + ErrCQInvalid = errors.New("clusterqueue invalid") +) + +func listOptions(namespace, queueLabel, continueToken string) []client.ListOption { + opts := []client.ListOption{ + client.InNamespace(namespace), + client.HasLabels{queueLabel}, + client.Limit(ListLength), + client.Continue(continueToken), + } + return opts +} + +type ImportCache struct { + Namespaces []string + QueueLabel string + Mapping map[string]string + LocalQueues map[string]map[string]*kueue.LocalQueue + ClusterQueues map[string]*kueue.ClusterQueue + ResourceFalvors map[string]*kueue.ResourceFlavor + PriorityClasses map[string]*schedulingv1.PriorityClass + AddLabels map[string]string +} + +func LoadImportCache(ctx context.Context, c client.Client, namespaces []string, queueLabel string, mapping, addLabels map[string]string) (*ImportCache, error) { + ret := ImportCache{ + Namespaces: slices.Clone(namespaces), + QueueLabel: queueLabel, + Mapping: maps.Clone(mapping), + LocalQueues: make(map[string]map[string]*kueue.LocalQueue), + AddLabels: addLabels, + } + + // get the cluster queues + cqList := &kueue.ClusterQueueList{} + if err := c.List(ctx, cqList); err != nil { + return nil, fmt.Errorf("loading cluster queues: %w", err) + } + ret.ClusterQueues = utilslices.ToRefMap(cqList.Items, func(cq *kueue.ClusterQueue) string { return cq.Name }) + + // get the local queues + for _, ns := range namespaces { + lqList := &kueue.LocalQueueList{} + if err := c.List(ctx, lqList); err != nil { + return nil, fmt.Errorf("loading local queues in namespace %s: %w", ns, err) + } + ret.LocalQueues[ns] = utilslices.ToRefMap(lqList.Items, func(lq *kueue.LocalQueue) string { return lq.Name }) + } + + // ResourceFlavors + rfList := &kueue.ResourceFlavorList{} + if err := c.List(ctx, rfList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.ResourceFalvors = utilslices.ToRefMap(rfList.Items, func(rf *kueue.ResourceFlavor) string { return rf.Name }) + + // PriorityClasses + pcList := &schedulingv1.PriorityClassList{} + if err := c.List(ctx, pcList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.PriorityClasses = utilslices.ToRefMap(pcList.Items, func(pc *schedulingv1.PriorityClass) string { return pc.Name }) + return &ret, nil +} + +func (mappingCache *ImportCache) LocalQueue(p *corev1.Pod) (*kueue.LocalQueue, error) { + mappingKey, found := p.Labels[mappingCache.QueueLabel] + if !found { + return nil, ErrNoMappingKey + } + + queueName, found := mappingCache.Mapping[mappingKey] + if !found { + return nil, fmt.Errorf("%s: %w", mappingKey, ErrNoMapping) + } + + nqQueues, found := mappingCache.LocalQueues[p.Namespace] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + + lq, found := nqQueues[queueName] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + return lq, nil +} + +func (mappingCache *ImportCache) ClusterQueue(p *corev1.Pod) (*kueue.ClusterQueue, error) { + lq, err := mappingCache.LocalQueue(p) + if err != nil { + return nil, err + } + queueName := string(lq.Spec.ClusterQueue) + cq, found := mappingCache.ClusterQueues[queueName] + if !found { + return nil, fmt.Errorf("cluster queue: %s: %w", queueName, ErrCQNotFound) + } + return cq, nil +} + +func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLabel string, ch chan<- corev1.Pod) error { + defer close(ch) + for _, ns := range namespaces { + lst := &corev1.PodList{} + log := ctrl.LoggerFrom(ctx).WithValues("namespace", ns) + log.V(3).Info("Begin pods list") + defer log.V(3).Info("End pods list") + page := 0 + for { + err := c.List(ctx, lst, listOptions(ns, queueLabel, lst.Continue)...) + if err != nil { + log.Error(err, "list") + return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err) + } + + for _, p := range lst.Items { + if p.Status.Phase == corev1.PodFailed || p.Status.Phase == corev1.PodSucceeded { + log.V(2).Info("Skip pod", "pod", klog.KObj(&p), "phase", p.Status.Phase) + } else { + ch <- p + } + } + + page++ + if lst.Continue == "" { + log.V(2).Info("No more pods", "pages", page) + break + } + } + } + return nil +} + +type Result struct { + Pod string + Err error +} + +type ProcessSummary struct { + TotalPods int + FailedPods int + ErrorsForPods map[string][]string + Errors []error +} + +func ConcurrentProcessPod(ch <-chan corev1.Pod, jobs uint, f func(p *corev1.Pod) error) ProcessSummary { + wg := sync.WaitGroup{} + resultCh := make(chan Result) + + wg.Add(int(jobs)) + for i := 0; i < int(jobs); i++ { + go func() { + defer wg.Done() + for pod := range ch { + err := f(&pod) + resultCh <- Result{Pod: client.ObjectKeyFromObject(&pod).String(), Err: err} + } + }() + } + go func() { + wg.Wait() + close(resultCh) + }() + + ps := ProcessSummary{ + ErrorsForPods: make(map[string][]string), + } + for result := range resultCh { + ps.TotalPods++ + if result.Err != nil { + ps.FailedPods++ + estr := result.Err.Error() + if _, found := ps.ErrorsForPods[estr]; !found { + ps.Errors = append(ps.Errors, result.Err) + } + ps.ErrorsForPods[estr] = append(ps.ErrorsForPods[estr], result.Pod) + } + } + return ps +} diff --git a/go.mod b/go.mod index c3337d6b88..fd247ccee8 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,9 @@ require ( github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.6.0 github.com/ray-project/kuberay/ray-operator v1.1.0-rc.1 + github.com/spf13/cobra v1.8.0 go.uber.org/zap v1.27.0 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/apiserver v0.29.2 @@ -82,7 +84,6 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect go.etcd.io/etcd/api/v3 v3.5.11 // indirect @@ -120,7 +121,6 @@ require ( google.golang.org/protobuf v1.32.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 542d11473e..b6161ecaeb 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -144,7 +144,7 @@ var ( _ jobframework.ComposableJob = (*Pod)(nil) ) -func fromObject(o runtime.Object) *Pod { +func FromObject(o runtime.Object) *Pod { out := Pod{} out.pod = *o.(*corev1.Pod) return &out @@ -429,7 +429,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, if !podsInGroup[i].DeletionTimestamp.IsZero() || (stopReason != jobframework.StopReasonWorkloadDeleted && podSuspended(&podsInGroup[i])) { continue } - podInGroup := fromObject(&podsInGroup[i]) + podInGroup := FromObject(&podsInGroup[i]) // The podset info is not relevant here, since this should mark the pod's end of life pCopy := &corev1.Pod{ @@ -654,7 +654,7 @@ func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) { } if !podRoleFound { - podSet := fromObject(&podInGroup).PodSets() + podSet := FromObject(&podInGroup).PodSets() podSet[0].Name = roleHash resultPodSets = append(resultPodSets, podSet[0]) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index c268f46838..640d7e273c 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -74,7 +74,7 @@ func TestPodsReady(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(tc.pod) + pod := FromObject(tc.pod) got := pod.PodsReady() if tc.want != got { t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) @@ -98,7 +98,7 @@ func TestRun(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(&tc.pods[0]) + pod := FromObject(&tc.pods[0]) ctx, _ := utiltesting.ContextWithLog(t) clientBuilder := utiltesting.NewClientBuilder() @@ -3554,9 +3554,9 @@ func TestIsPodOwnerManagedByQueue(t *testing.T) { pod.OwnerReferences = append(pod.OwnerReferences, tc.ownerReference) - if tc.wantRes != IsPodOwnerManagedByKueue(fromObject(pod)) { + if tc.wantRes != IsPodOwnerManagedByKueue(FromObject(pod)) { t.Errorf("Unexpected 'IsPodOwnerManagedByKueue' result\n want: %t\n got: %t)", - tc.wantRes, IsPodOwnerManagedByKueue(fromObject(pod))) + tc.wantRes, IsPodOwnerManagedByKueue(FromObject(pod))) } }) } diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index b81ae06563..ecee0c6a90 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -133,7 +133,7 @@ func (p *Pod) addRoleHash() error { } func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Applying defaults") @@ -201,7 +201,7 @@ var _ webhook.CustomValidator = &PodWebhook{} func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Validating create") allErrs := jobframework.ValidateCreateForQueueName(pod) @@ -220,8 +220,8 @@ func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (ad func (w *PodWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - oldPod := fromObject(oldObj) - newPod := fromObject(newObj) + oldPod := FromObject(oldObj) + newPod := FromObject(newObj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&newPod.pod)) log.V(5).Info("Validating update") allErrs := jobframework.ValidateUpdateForQueueName(oldPod, newPod) diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 92ccc216f1..d465781df9 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -85,7 +85,7 @@ func (f *Framework) Init() *rest.Config { return cfg } -func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { +func (f *Framework) SetupClient(cfg *rest.Config) (context.Context, client.Client) { err := config.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) @@ -116,6 +116,13 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) gomega.ExpectWithOffset(1, k8sClient).NotTo(gomega.BeNil()) + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + return ctx, k8sClient +} + +func (f *Framework) StartManager(ctx context.Context, cfg *rest.Config, managerSetup ManagerSetup) { webhookInstallOptions := &f.testEnv.WebhookInstallOptions mgrOpts := manager.Options{ Scheme: scheme.Scheme, @@ -132,8 +139,6 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con mgr, err := ctrl.NewManager(cfg, mgrOpts) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager") - ctx, cancel := context.WithCancel(context.Background()) - f.cancel = cancel managerSetup(mgr, ctx) go func() { @@ -155,7 +160,11 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con return nil }).Should(gomega.Succeed()) } +} +func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { + ctx, k8sClient := f.SetupClient(cfg) + f.StartManager(ctx, cfg, managerSetup) return ctx, k8sClient } diff --git a/test/integration/importer/importer_test.go b/test/integration/importer/importer_test.go new file mode 100644 index 0000000000..1f0a9c9bab --- /dev/null +++ b/test/integration/importer/importer_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + importerpod "sigs.k8s.io/kueue/cmd/importer/pod" + importerutil "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/metrics" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + utiltestingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("Importer", func() { + var ( + ns *corev1.Namespace + flavor *kueue.ResourceFlavor + cq *kueue.ClusterQueue + lq *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "import-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + flavor = utiltesting.MakeResourceFlavor("f1").Obj() + gomega.Expect(k8sClient.Create(ctx, flavor)).To(gomega.Succeed()) + + cq = utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "4").Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) + + lq = utiltesting.MakeLocalQueue("lq1", ns.Name).ClusterQueue("cq1").Obj() + gomega.Expect(k8sClient.Create(ctx, lq)).To(gomega.Succeed()) + + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor, true) + }) + + ginkgo.When("Kueue is started after import", func() { + ginkgo.It("Should keep the imported pods admitted", func() { + + pod1 := utiltestingpod.MakePod("pod1", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + pod2 := utiltestingpod.MakePod("pod2", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + + ginkgo.By("Creating the initial pods", func() { + gomega.Expect(k8sClient.Create(ctx, pod1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, pod2)).To(gomega.Succeed()) + }) + + ginkgo.By("Running the import", func() { + mapping, err := importerutil.LoadImportCache(ctx, k8sClient, []string{ns.Name}, "src.lbl", map[string]string{"src-val": "lq1"}, nil) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(mapping).ToNot(gomega.BeNil()) + + gomega.Expect(importerpod.Check(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + gomega.Expect(importerpod.Import(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + }) + + wl1LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod1.Name, pod1.UID), Namespace: ns.Name} + wl2LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod2.Name, pod2.UID), Namespace: ns.Name} + wl1 := &kueue.Workload{} + wl2 := &kueue.Workload{} + + ginkgo.By("Checking the Workloads are created and admitted", func() { + gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + wl1UID := wl1.UID + wl2UID := wl2.UID + + ginkgo.By("Starting kueue, the cluster queue status should account for the imported Workloads", func() { + fwk.StartManager(ctx, cfg, managerAndSchedulerSetup) + + util.ExpectClusterQueueStatusMetric(cq, metrics.CQStatusActive) + gomega.Eventually(func(g gomega.Gomega) { + updatedQueue := &kueue.ClusterQueue{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), updatedQueue)).To(gomega.Succeed()) + g.Expect(updatedQueue.Status.AdmittedWorkloads).To(gomega.Equal(int32(2))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + pod3 := utiltestingpod.MakePod("pod3", ns.Name). + Queue("lq1"). + Label("kueue.x-k8s.io/managed", "true"). + Request(corev1.ResourceCPU, "2"). + KueueSchedulingGate(). + Obj() + + ginkgo.By("Creating a new pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod3)).To(gomega.Succeed()) + }) + + wl3LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod3.Name, pod3.UID), Namespace: ns.Name} + wl3 := &kueue.Workload{} + + ginkgo.By("Checking the Workload is created and pending while the old ones remain admitted", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wl3LookupKey, wl3)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + util.ExpectWorkloadsToBePending(ctx, k8sClient, wl3) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + ginkgo.By("By finishing an imported pod, the new one's Workload should be admitted", func() { + util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pod2) + + util.ExpectWorkloadToFinish(ctx, k8sClient, wl2LookupKey) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl3) + }) + + ginkgo.By("Checking the imported Workloads are not recreated", func() { + gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed()) + gomega.Expect(wl1.UID).To(gomega.Equal(wl1UID)) + gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed()) + gomega.Expect(wl2.UID).To(gomega.Equal(wl2UID)) + }) + }) + }) + +}) diff --git a/test/integration/importer/suite_test.go b/test/integration/importer/suite_test.go new file mode 100644 index 0000000000..2d15c1b58d --- /dev/null +++ b/test/integration/importer/suite_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + // +kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework +) + +func TestScheduler(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "Importer Suite", + ) +} + +var _ = ginkgo.BeforeSuite(func() { + fwk = &framework.Framework{ + CRDPath: filepath.Join("..", "..", "..", "config", "components", "crd", "bases"), + } + cfg = fwk.Init() + ctx, k8sClient = fwk.SetupClient(cfg) +}) + +var _ = ginkgo.AfterSuite(func() { + fwk.Teardown() +}) + +func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + podReconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = podReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +}