From 87a4306bed68490ae3c07ead4724f5375f79eced Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Mon, 18 Mar 2024 11:29:45 +0200 Subject: [PATCH 1/8] [pod] Export `FromObject()` --- pkg/controller/jobs/pod/pod_controller.go | 6 +++--- pkg/controller/jobs/pod/pod_controller_test.go | 8 ++++---- pkg/controller/jobs/pod/pod_webhook.go | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index 542d11473e..b6161ecaeb 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -144,7 +144,7 @@ var ( _ jobframework.ComposableJob = (*Pod)(nil) ) -func fromObject(o runtime.Object) *Pod { +func FromObject(o runtime.Object) *Pod { out := Pod{} out.pod = *o.(*corev1.Pod) return &out @@ -429,7 +429,7 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo, if !podsInGroup[i].DeletionTimestamp.IsZero() || (stopReason != jobframework.StopReasonWorkloadDeleted && podSuspended(&podsInGroup[i])) { continue } - podInGroup := fromObject(&podsInGroup[i]) + podInGroup := FromObject(&podsInGroup[i]) // The podset info is not relevant here, since this should mark the pod's end of life pCopy := &corev1.Pod{ @@ -654,7 +654,7 @@ func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) { } if !podRoleFound { - podSet := fromObject(&podInGroup).PodSets() + podSet := FromObject(&podInGroup).PodSets() podSet[0].Name = roleHash resultPodSets = append(resultPodSets, podSet[0]) diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go index c268f46838..640d7e273c 100644 --- a/pkg/controller/jobs/pod/pod_controller_test.go +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -74,7 +74,7 @@ func TestPodsReady(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(tc.pod) + pod := FromObject(tc.pod) got := pod.PodsReady() if tc.want != got { t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) @@ -98,7 +98,7 @@ func TestRun(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - pod := fromObject(&tc.pods[0]) + pod := FromObject(&tc.pods[0]) ctx, _ := utiltesting.ContextWithLog(t) clientBuilder := utiltesting.NewClientBuilder() @@ -3554,9 +3554,9 @@ func TestIsPodOwnerManagedByQueue(t *testing.T) { pod.OwnerReferences = append(pod.OwnerReferences, tc.ownerReference) - if tc.wantRes != IsPodOwnerManagedByKueue(fromObject(pod)) { + if tc.wantRes != IsPodOwnerManagedByKueue(FromObject(pod)) { t.Errorf("Unexpected 'IsPodOwnerManagedByKueue' result\n want: %t\n got: %t)", - tc.wantRes, IsPodOwnerManagedByKueue(fromObject(pod))) + tc.wantRes, IsPodOwnerManagedByKueue(FromObject(pod))) } }) } diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index b81ae06563..ecee0c6a90 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -133,7 +133,7 @@ func (p *Pod) addRoleHash() error { } func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Applying defaults") @@ -201,7 +201,7 @@ var _ webhook.CustomValidator = &PodWebhook{} func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - pod := fromObject(obj) + pod := FromObject(obj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&pod.pod)) log.V(5).Info("Validating create") allErrs := jobframework.ValidateCreateForQueueName(pod) @@ -220,8 +220,8 @@ func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (ad func (w *PodWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { var warnings admission.Warnings - oldPod := fromObject(oldObj) - newPod := fromObject(newObj) + oldPod := FromObject(oldObj) + newPod := FromObject(newObj) log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(&newPod.pod)) log.V(5).Info("Validating update") allErrs := jobframework.ValidateUpdateForQueueName(oldPod, newPod) From 452521302c32ad109387106e2e05830609f28847 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 7 Mar 2024 14:26:25 +0200 Subject: [PATCH 2/8] [cmd/importer] Initial implementation. --- cmd/importer/README.md | 40 ++++ cmd/importer/main.go | 175 ++++++++++++++++ cmd/importer/pod/check.go | 74 +++++++ cmd/importer/pod/check_test.go | 137 +++++++++++++ cmd/importer/pod/import.go | 212 ++++++++++++++++++++ cmd/importer/pod/import_test.go | 162 +++++++++++++++ cmd/importer/util/util.go | 220 +++++++++++++++++++++ go.mod | 4 +- test/integration/framework/framework.go | 15 +- test/integration/importer/importer_test.go | 155 +++++++++++++++ test/integration/importer/suite_test.go | 102 ++++++++++ 11 files changed, 1291 insertions(+), 5 deletions(-) create mode 100644 cmd/importer/README.md create mode 100644 cmd/importer/main.go create mode 100644 cmd/importer/pod/check.go create mode 100644 cmd/importer/pod/check_test.go create mode 100644 cmd/importer/pod/import.go create mode 100644 cmd/importer/pod/import_test.go create mode 100644 cmd/importer/util/util.go create mode 100644 test/integration/importer/importer_test.go create mode 100644 test/integration/importer/suite_test.go diff --git a/cmd/importer/README.md b/cmd/importer/README.md new file mode 100644 index 0000000000..3f0d97c90d --- /dev/null +++ b/cmd/importer/README.md @@ -0,0 +1,40 @@ +# Kueue Importer Tool + +A tool able to import existing pods into kueue. + +## Cluster setup. + +The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration disabled. + +## Build + +From kueue source root run: + ```bash +go build -C cmd/importer/ -o $(pwd)/bin/importer + + ``` + +## Usage + +The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. + +The will perform following checks: + +- At least one `namespace` is provided. +- The label key (`queuelabel`) providing the queue mapping is provided. +- A mapping from ane of the encountered `queuelabel` values to an existing LocalQueue exists. +- The LocalQueues involved in the import are using an existing ClusterQueue. +- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. + +After which, if `--dry-run=false` was specified, for each selected Pod the importer will: + +- Update the Pod's Kueue related labels. +- Create a Workload associated with the Pod. +- Admit the Workload. + +### Example + +```bash +./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val=user-queue --dry-run=false +``` + Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set. \ No newline at end of file diff --git a/cmd/importer/main.go b/cmd/importer/main.go new file mode 100644 index 0000000000..d448c6673a --- /dev/null +++ b/cmd/importer/main.go @@ -0,0 +1,175 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "os" + + "github.com/spf13/cobra" + "go.uber.org/zap/zapcore" + "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/pod" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/util/useragent" +) + +const ( + NamespaceFlag = "namespace" + NamespaceFlagShort = "n" + QueueMappingFlag = "queuemapping" + QueueLabelFlag = "queuelabel" + QPSFlag = "qps" + BurstFlag = "burst" + VerbosityFlag = "verbose" + VerboseFlagShort = "v" + JobsFlag = "jobs" + JobsFlagShort = "j" + DryRunFlag = "dry-run" +) + +var ( + rootCmd = &cobra.Command{ + Use: "importer", + Short: "Import existing (running) objects into Kueue", + PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { + v, _ := cmd.Flags().GetCount(VerbosityFlag) + level := (v + 1) * -1 + ctrl.SetLogger(zap.New( + zap.UseDevMode(true), + zap.ConsoleEncoder(), + zap.Level(zapcore.Level(level)), + )) + return nil + }, + } +) + +func setFlags(cmd *cobra.Command) { + cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") + cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") + cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") + cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") + cmd.Flags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent jobs") + cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") + + _ = cmd.MarkFlagRequired(QueueLabelFlag) + _ = cmd.MarkFlagRequired(NamespaceFlag) +} + +func init() { + rootCmd.AddGroup(&cobra.Group{ + ID: "pod", + Title: "Pods import", + }) + rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") + + importCmd := &cobra.Command{ + Use: "import", + GroupID: "pod", + Short: "Checks the prerequisites and import pods.", + RunE: importCmd, + } + setFlags(importCmd) + rootCmd.AddCommand(importCmd) +} + +func main() { + err := rootCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) { + flags := cmd.Flags() + namespaces, err := flags.GetStringSlice(NamespaceFlag) + if err != nil { + return nil, err + } + queueLabel, err := flags.GetString(QueueLabelFlag) + if err != nil { + return nil, err + } + mapping, err := flags.GetStringToString(QueueMappingFlag) + if err != nil { + return nil, err + } + return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping) +} + +func getKubeClient(cmd *cobra.Command) (client.Client, error) { + kubeConfig, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + if kubeConfig.UserAgent == "" { + kubeConfig.UserAgent = useragent.Default() + } + qps, err := cmd.Flags().GetFloat32(QPSFlag) + if err != nil { + return nil, err + } + kubeConfig.QPS = qps + bust, err := cmd.Flags().GetInt(BurstFlag) + if err != nil { + return nil, err + } + kubeConfig.Burst = bust + + if err := kueue.AddToScheme(scheme.Scheme); err != nil { + return nil, err + } + + c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, err + } + return c, nil +} + +func importCmd(cmd *cobra.Command, _ []string) error { + log := ctrl.Log.WithName("import") + ctx := ctrl.LoggerInto(context.Background(), log) + jobs, _ := cmd.Flags().GetUint(JobsFlag) + c, err := getKubeClient(cmd) + if err != nil { + return err + } + + cache, err := loadMappingCache(ctx, c, cmd) + if err != nil { + return err + } + + if err = pod.Check(ctx, c, cache, jobs); err != nil { + return err + } + + if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr { + fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) + return nil + } + return pod.Import(ctx, c, cache, jobs) +} diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go new file mode 100644 index 0000000000..630cf9d453 --- /dev/null +++ b/cmd/importer/pod/check.go @@ -0,0 +1,74 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/kueue/cmd/importer/util" +) + +func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go func() { + err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "Listing pods") + } + }() + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Checking") + + cq, err := cache.ClusterQueue(p) + if err != nil { + return err + } + + if len(cq.Spec.ResourceGroups) == 0 { + return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) + } + + if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { + return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) + } + + rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) + if _, found := cache.ResourceFalvors[rf]; !found { + return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid) + } + + // do some additional checks like: + // - (maybe) the resources managed by the queues + + return nil + }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0]) + } + return errors.Join(summary.Errors...) +} diff --git a/cmd/importer/pod/check_test.go b/cmd/importer/pod/check_test.go new file mode 100644 index 0000000000..975842e25f --- /dev/null +++ b/cmd/importer/pod/check_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +const ( + testingNamespace = "ns" + testingQueueLabel = "testing.lbl" +) + +func TestCheckNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + Label(testingQueueLabel, "q1") + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1") + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + flavors []kueue.ResourceFlavor + + wantError error + }{ + "empty cluster": {}, + "no mapping": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + wantError: util.ErrNoMapping, + }, + "no local queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + wantError: util.ErrLQNotFound, + }, + "no cluster queue": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + wantError: util.ErrCQNotFound, + }, + "invalid cq": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + wantError: util.ErrCQInvalid, + }, + "all found": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("cq1").ResourceGroup(*utiltesting.MakeFlavorQuotas("rf1").Resource(corev1.ResourceCPU, "1").Obj()).Obj(), + }, + flavors: []kueue.ResourceFlavor{ + *utiltesting.MakeResourceFlavor("rf1").Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + rfList := kueue.ResourceFlavorList{Items: tc.flavors} + + builder := utiltesting.NewClientBuilder() + builder = builder.WithLists(&podsList, &cqList, &lqList, &rfList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + gotErr := Check(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + }) + } +} diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go new file mode 100644 index 0000000000..db46d88e58 --- /dev/null +++ b/cmd/importer/pod/import.go @@ -0,0 +1,212 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "errors" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/constants" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/workload" +) + +func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { + ch := make(chan corev1.Pod) + go func() { + err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) + if err != nil { + ctrl.LoggerFrom(ctx).Error(err, "Listing pods") + } + }() + summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { + log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) + log.V(3).Info("Importing") + + lq, err := cache.LocalQueue(p) + if err != nil { + return err + } + + oldLq, found := p.Labels[controllerconstants.QueueLabel] + if !found { + if err := addLabels(ctx, c, p, lq.Name); err != nil { + return fmt.Errorf("cannot add queue label: %w", err) + } + } else if oldLq != lq.Name { + return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name) + } + + kp := pod.FromObject(p) + // Note: the recorder is not used for single pods, we can just pass nil for now. + wl, err := kp.ConstructComposableWorkload(ctx, c, nil) + if err != nil { + return fmt.Errorf("construct workload: %w", err) + } + + if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found { + wl.Spec.PriorityClassName = pc.Name + wl.Spec.Priority = &pc.Value + wl.Spec.PriorityClassSource = constants.PodPriorityClassSource + } + + if err := createWorkload(ctx, c, wl); err != nil { + return fmt.Errorf("creating workload: %w", err) + + } + + //make its admission and update its status + + info := workload.NewInfo(wl) + cq := cache.ClusterQueues[string(lq.Spec.ClusterQueue)] + admission := kueue.Admission{ + ClusterQueue: kueue.ClusterQueueReference(cq.Name), + PodSetAssignments: []kueue.PodSetAssignment{ + { + Name: info.TotalRequests[0].Name, + Flavors: make(map[corev1.ResourceName]kueue.ResourceFlavorReference), + ResourceUsage: info.TotalRequests[0].Requests.ToResourceList(), + Count: ptr.To[int32](1), + }, + }, + } + flv := cq.Spec.ResourceGroups[0].Flavors[0].Name + for r := range info.TotalRequests[0].Requests { + admission.PodSetAssignments[0].Flavors[r] = flv + } + + wl.Status.Admission = &admission + reservedCond := metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, reservedCond) + admittedCond := metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), + } + apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond) + return admitWorkload(ctx, c, wl) + }) + + fmt.Printf("Checked %d pods\n", summary.TotalPods) + fmt.Printf("Failed %d pods\n", summary.FailedPods) + for e, pods := range summary.ErrorsForPods { + fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods) + } + return errors.Join(summary.Errors...) +} + +func checkError(err error) (retry, reload bool, timeout time.Duration) { + retry_seconds, retry := apierrors.SuggestsClientDelay(err) + if retry { + return true, false, time.Duration(retry_seconds) * time.Second + } + + if apierrors.IsConflict(err) { + return true, true, 0 + } + return false, false, 0 +} + +func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string) error { + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + + err := c.Update(ctx, p) + retry, reload, timeouut := checkError(err) + + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + if reload { + err = c.Get(ctx, client.ObjectKeyFromObject(p), p) + if err != nil { + retry, reload, timeouut = checkError(err) + continue + } + p.Labels[controllerconstants.QueueLabel] = queue + p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + } + err = c.Update(ctx, p) + retry, reload, timeouut = checkError(err) + } + return err +} + +func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := c.Create(ctx, wl) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = c.Create(ctx, wl) + retry, _, timeouut = checkError(err) + } + return err +} + +func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { + err := workload.ApplyAdmissionStatus(ctx, c, wl, false) + if apierrors.IsAlreadyExists(err) { + return nil + } + retry, _, timeouut := checkError(err) + for retry { + if timeouut >= 0 { + select { + case <-ctx.Done(): + return errors.New("context canceled") + case <-time.After(timeouut): + } + } + err = workload.ApplyAdmissionStatus(ctx, c, wl, false) + retry, _, timeouut = checkError(err) + } + return err +} diff --git a/cmd/importer/pod/import_test.go b/cmd/importer/pod/import_test.go new file mode 100644 index 0000000000..70a09bcee9 --- /dev/null +++ b/cmd/importer/pod/import_test.go @@ -0,0 +1,162 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestImportNamespace(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", testingNamespace). + UID("pod"). + Label(testingQueueLabel, "q1"). + Image("img", nil). + Request(corev1.ResourceCPU, "1") + + baseWlWrapper := utiltesting.MakeWorkload("pod-pod-b17ab", testingNamespace). + ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "pod"). + Label(constants.JobUIDLabel, "pod"). + Finalizers(kueue.ResourceInUseFinalizerName). + Queue("lq1"). + PodSets(*utiltesting.MakePodSet("main", 1). + Image("img"). + Request(corev1.ResourceCPU, "1"). + Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq1"). + Assignment(corev1.ResourceCPU, "f1", "1"). + Obj()). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Imported", + Message: "Imported into ClusterQueue cq1", + }) + + baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1") + baseClusterQueue := utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "1", "0").Obj()) + + podCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + } + + wlCmpOpts := []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion"), + cmpopts.IgnoreFields(metav1.Condition{}, "ObservedGeneration", "LastTransitionTime"), + } + + cases := map[string]struct { + pods []corev1.Pod + clusteQueues []kueue.ClusterQueue + localQueues []kueue.LocalQueue + mapping map[string]string + + wantPods []corev1.Pod + wantWorkloads []kueue.Workload + wantError error + }{ + + "create one": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Obj(), + }, + + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone().Obj(), + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + podsList := corev1.PodList{Items: tc.pods} + cqList := kueue.ClusterQueueList{Items: tc.clusteQueues} + lqList := kueue.LocalQueueList{Items: tc.localQueues} + + builder := utiltesting.NewClientBuilder(). + WithInterceptorFuncs(interceptor.Funcs{SubResourcePatch: utiltesting.TreatSSAAsStrategicMerge}).WithStatusSubresource(&kueue.Workload{}). + WithLists(&podsList, &cqList, &lqList) + + client := builder.Build() + ctx := context.Background() + + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + gotErr := Import(ctx, client, mpc, 8) + + if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + + err := client.List(ctx, &podsList) + if err != nil { + t.Errorf("Unexpected list pod error: %s", err) + } + if diff := cmp.Diff(tc.wantPods, podsList.Items, podCmpOpts...); diff != "" { + t.Errorf("Unexpected pods (-want/+got)\n%s", diff) + } + + wlList := kueue.WorkloadList{} + err = client.List(ctx, &wlList) + if err != nil { + t.Errorf("Unexpected list workloads error: %s", err) + } + if diff := cmp.Diff(tc.wantWorkloads, wlList.Items, wlCmpOpts...); diff != "" { + t.Errorf("Unexpected workloads (-want/+got)\n%s", diff) + } + + }) + } +} diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go new file mode 100644 index 0000000000..c896071495 --- /dev/null +++ b/cmd/importer/util/util.go @@ -0,0 +1,220 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "errors" + "fmt" + "maps" + "slices" + "sync" + + corev1 "k8s.io/api/core/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" +) + +const ( + ListLength = 100 +) + +var ( + ErrNoMappingKey = errors.New("no mapping key found") + ErrNoMapping = errors.New("no mapping found") + ErrLQNotFound = errors.New("localqueue not found") + ErrCQNotFound = errors.New("clusterqueue not found") + ErrCQInvalid = errors.New("clusterqueue invalid") +) + +func ListOptions(namespace, queueLabel, continueToken string) []client.ListOption { + opts := []client.ListOption{ + client.InNamespace(namespace), + client.HasLabels{queueLabel}, + client.Limit(ListLength), + client.Continue(continueToken), + } + return opts +} + +type ImportCache struct { + Namespaces []string + QueueLabel string + Mapping map[string]string + LocalQueues map[string]map[string]*kueue.LocalQueue + ClusterQueues map[string]*kueue.ClusterQueue + ResourceFalvors map[string]*kueue.ResourceFlavor + PriorityClasses map[string]*schedulingv1.PriorityClass +} + +func LoadImportCache(ctx context.Context, c client.Client, namespaces []string, queueLabel string, mapping map[string]string) (*ImportCache, error) { + ret := ImportCache{ + Namespaces: slices.Clone(namespaces), + QueueLabel: queueLabel, + Mapping: maps.Clone(mapping), + LocalQueues: make(map[string]map[string]*kueue.LocalQueue), + } + + // get the cluster queues + cqList := &kueue.ClusterQueueList{} + if err := c.List(ctx, cqList); err != nil { + return nil, fmt.Errorf("loading cluster queues: %w", err) + } + ret.ClusterQueues = utilslices.ToRefMap(cqList.Items, func(cq *kueue.ClusterQueue) string { return cq.Name }) + + // get the local queues + for _, ns := range namespaces { + lqList := &kueue.LocalQueueList{} + if err := c.List(ctx, lqList); err != nil { + return nil, fmt.Errorf("loading local queues in namespace %s: %w", ns, err) + } + ret.LocalQueues[ns] = utilslices.ToRefMap(lqList.Items, func(lq *kueue.LocalQueue) string { return lq.Name }) + } + + // ResourceFlavors + rfList := &kueue.ResourceFlavorList{} + if err := c.List(ctx, rfList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.ResourceFalvors = utilslices.ToRefMap(rfList.Items, func(rf *kueue.ResourceFlavor) string { return rf.Name }) + + // PriorityClasses + pcList := &schedulingv1.PriorityClassList{} + if err := c.List(ctx, pcList); err != nil { + return nil, fmt.Errorf("loading resource flavors: %w", err) + } + ret.PriorityClasses = utilslices.ToRefMap(pcList.Items, func(pc *schedulingv1.PriorityClass) string { return pc.Name }) + return &ret, nil +} + +func (mappingCache *ImportCache) LocalQueue(p *corev1.Pod) (*kueue.LocalQueue, error) { + mappingKey, found := p.Labels[mappingCache.QueueLabel] + if !found { + return nil, ErrNoMappingKey + } + + queueName, found := mappingCache.Mapping[mappingKey] + if !found { + return nil, fmt.Errorf("%s: %w", mappingKey, ErrNoMapping) + } + + nqQueues, found := mappingCache.LocalQueues[p.Namespace] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + + lq, found := nqQueues[queueName] + if !found { + return nil, fmt.Errorf("%s: %w", queueName, ErrLQNotFound) + } + return lq, nil +} + +func (mappingCache *ImportCache) ClusterQueue(p *corev1.Pod) (*kueue.ClusterQueue, error) { + lq, err := mappingCache.LocalQueue(p) + if err != nil { + return nil, err + } + queueName := string(lq.Spec.ClusterQueue) + cq, found := mappingCache.ClusterQueues[queueName] + if !found { + return nil, fmt.Errorf("cluster queue: %s: %w", queueName, ErrCQNotFound) + } + return cq, nil +} + +func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLabel string, ch chan<- corev1.Pod) error { + defer close(ch) + for _, ns := range namespaces { + lst := &corev1.PodList{} + log := ctrl.LoggerFrom(ctx).WithValues("namespace", ns) + log.V(3).Info("Begin pods list") + defer log.V(3).Info("End pods list") + page := 0 + for { + err := c.List(ctx, lst, ListOptions(ns, queueLabel, lst.Continue)...) + if err != nil { + log.Error(err, "list") + return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err) + } + + for _, p := range lst.Items { + // ignore deleted or in final Phase? + ch <- p + } + + page++ + if lst.Continue == "" { + log.V(2).Info("No more pods", "pages", page) + break + } + } + } + return nil +} + +type Result struct { + Pod string + Err error +} + +type ProcessSummary struct { + TotalPods int + FailedPods int + ErrorsForPods map[string][]string + Errors []error +} + +func ConcurrentProcessPod(ch <-chan corev1.Pod, jobs uint, f func(p *corev1.Pod) error) ProcessSummary { + wg := sync.WaitGroup{} + resultCh := make(chan Result) + + wg.Add(int(jobs)) + for i := 0; i < int(jobs); i++ { + go func() { + defer wg.Done() + for pod := range ch { + err := f(&pod) + resultCh <- Result{Pod: client.ObjectKeyFromObject(&pod).String(), Err: err} + } + }() + } + go func() { + wg.Wait() + close(resultCh) + }() + + ps := ProcessSummary{ + ErrorsForPods: make(map[string][]string), + } + for result := range resultCh { + ps.TotalPods++ + if result.Err != nil { + ps.FailedPods++ + estr := result.Err.Error() + if _, found := ps.ErrorsForPods[estr]; !found { + ps.Errors = append(ps.Errors, result.Err) + } + ps.ErrorsForPods[estr] = append(ps.ErrorsForPods[estr], result.Pod) + } + } + return ps +} diff --git a/go.mod b/go.mod index c3337d6b88..97614ebc6f 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_model v0.6.0 github.com/ray-project/kuberay/ray-operator v1.1.0-rc.1 + github.com/spf13/cobra v1.8.0 + github.com/spf13/pflag v1.0.5 go.uber.org/zap v1.27.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 @@ -82,8 +84,6 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/spf13/cobra v1.8.0 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect go.etcd.io/etcd/api/v3 v3.5.11 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.11 // indirect diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 92ccc216f1..d465781df9 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -85,7 +85,7 @@ func (f *Framework) Init() *rest.Config { return cfg } -func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { +func (f *Framework) SetupClient(cfg *rest.Config) (context.Context, client.Client) { err := config.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) @@ -116,6 +116,13 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) gomega.ExpectWithOffset(1, k8sClient).NotTo(gomega.BeNil()) + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + return ctx, k8sClient +} + +func (f *Framework) StartManager(ctx context.Context, cfg *rest.Config, managerSetup ManagerSetup) { webhookInstallOptions := &f.testEnv.WebhookInstallOptions mgrOpts := manager.Options{ Scheme: scheme.Scheme, @@ -132,8 +139,6 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con mgr, err := ctrl.NewManager(cfg, mgrOpts) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred(), "failed to create manager") - ctx, cancel := context.WithCancel(context.Background()) - f.cancel = cancel managerSetup(mgr, ctx) go func() { @@ -155,7 +160,11 @@ func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (con return nil }).Should(gomega.Succeed()) } +} +func (f *Framework) RunManager(cfg *rest.Config, managerSetup ManagerSetup) (context.Context, client.Client) { + ctx, k8sClient := f.SetupClient(cfg) + f.StartManager(ctx, cfg, managerSetup) return ctx, k8sClient } diff --git a/test/integration/importer/importer_test.go b/test/integration/importer/importer_test.go new file mode 100644 index 0000000000..544a7fec10 --- /dev/null +++ b/test/integration/importer/importer_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + importerpod "sigs.k8s.io/kueue/cmd/importer/pod" + importerutil "sigs.k8s.io/kueue/cmd/importer/util" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/metrics" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + utiltestingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("Importer", func() { + var ( + ns *corev1.Namespace + flavor *kueue.ResourceFlavor + cq *kueue.ClusterQueue + lq *kueue.LocalQueue + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "import-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + flavor = utiltesting.MakeResourceFlavor("f1").Obj() + gomega.Expect(k8sClient.Create(ctx, flavor)).To(gomega.Succeed()) + + cq = utiltesting.MakeClusterQueue("cq1"). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("f1").Resource(corev1.ResourceCPU, "4").Obj(), + ). + Obj() + gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) + + lq = utiltesting.MakeLocalQueue("lq1", ns.Name).ClusterQueue("cq1").Obj() + gomega.Expect(k8sClient.Create(ctx, lq)).To(gomega.Succeed()) + + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor, true) + }) + + ginkgo.When("Kueue is started after import", func() { + ginkgo.It("Should keep the imported pods admitted", func() { + + pod1 := utiltestingpod.MakePod("pod1", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + pod2 := utiltestingpod.MakePod("pod2", ns.Name). + Label("src.lbl", "src-val"). + Request(corev1.ResourceCPU, "2"). + Obj() + + ginkgo.By("Creating the initial pods", func() { + gomega.Expect(k8sClient.Create(ctx, pod1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, pod2)).To(gomega.Succeed()) + }) + + ginkgo.By("Running the import", func() { + mapping, err := importerutil.LoadImportCache(ctx, k8sClient, []string{ns.Name}, "src.lbl", map[string]string{"src-val": "lq1"}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(mapping).ToNot(gomega.BeNil()) + + gomega.Expect(importerpod.Check(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + gomega.Expect(importerpod.Import(ctx, k8sClient, mapping, 8)).To(gomega.Succeed()) + }) + + wl1LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod1.Name, pod1.UID), Namespace: ns.Name} + wl2LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod2.Name, pod2.UID), Namespace: ns.Name} + wl1 := &kueue.Workload{} + wl2 := &kueue.Workload{} + + ginkgo.By("Checking the Workloads are created and admitted", func() { + gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed()) + + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + ginkgo.By("Starting kueue, the cluster queue status should account for the imported Workloads", func() { + fwk.StartManager(ctx, cfg, managerAndSchedulerSetup) + + util.ExpectClusterQueueStatusMetric(cq, metrics.CQStatusActive) + gomega.Eventually(func(g gomega.Gomega) { + updatedQueue := &kueue.ClusterQueue{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(cq), updatedQueue)).To(gomega.Succeed()) + g.Expect(updatedQueue.Status.AdmittedWorkloads).To(gomega.Equal(int32(2))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + pod3 := utiltestingpod.MakePod("pod3", ns.Name). + Queue("lq1"). + Label("kueue.x-k8s.io/managed", "true"). + Request(corev1.ResourceCPU, "2"). + KueueSchedulingGate(). + Obj() + + ginkgo.By("Creating a new pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod3)).To(gomega.Succeed()) + }) + + wl3LookupKey := types.NamespacedName{Name: pod.GetWorkloadNameForPod(pod3.Name, pod3.UID), Namespace: ns.Name} + wl3 := &kueue.Workload{} + + ginkgo.By("Checking the Workload is created and pending while the old ones remain admitted", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wl3LookupKey, wl3)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + util.ExpectWorkloadsToBePending(ctx, k8sClient, wl3) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) + }) + + ginkgo.By("By finishing an imported pod, the new one's Workload should be admitted", func() { + util.SetPodsPhase(ctx, k8sClient, corev1.PodSucceeded, pod2) + + util.ExpectWorkloadToFinish(ctx, k8sClient, wl2LookupKey) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl3) + }) + }) + }) + +}) diff --git a/test/integration/importer/suite_test.go b/test/integration/importer/suite_test.go new file mode 100644 index 0000000000..2d15c1b58d --- /dev/null +++ b/test/integration/importer/suite_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package importer + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + // +kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + ctx context.Context + fwk *framework.Framework +) + +func TestScheduler(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "Importer Suite", + ) +} + +var _ = ginkgo.BeforeSuite(func() { + fwk = &framework.Framework{ + CRDPath: filepath.Join("..", "..", "..", "config", "components", "crd", "bases"), + } + cfg = fwk.Init() + ctx, k8sClient = fwk.SetupClient(cfg) +}) + +var _ = ginkgo.AfterSuite(func() { + fwk.Teardown() +}) + +func managerAndSchedulerSetup(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + configuration := &config.Configuration{} + mgr.GetScheme().Default(configuration) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, configuration) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + err = workloadjob.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + podReconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName)) + err = podReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) +} From 0e0b365efae17a57e4e7b4f058cdbf8337ff7b1d Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 19 Mar 2024 17:48:55 +0200 Subject: [PATCH 3/8] Review Remarks --- cmd/importer/main.go | 48 +++++++++++++++++++++++++++++++++----------- go.mod | 4 ++-- 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/cmd/importer/main.go b/cmd/importer/main.go index d448c6673a..eb76a7823b 100644 --- a/cmd/importer/main.go +++ b/cmd/importer/main.go @@ -19,10 +19,12 @@ package main import ( "context" "fmt" + "maps" "os" "github.com/spf13/cobra" "go.uber.org/zap/zapcore" + "gopkg.in/yaml.v2" "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,17 +37,18 @@ import ( ) const ( - NamespaceFlag = "namespace" - NamespaceFlagShort = "n" - QueueMappingFlag = "queuemapping" - QueueLabelFlag = "queuelabel" - QPSFlag = "qps" - BurstFlag = "burst" - VerbosityFlag = "verbose" - VerboseFlagShort = "v" - JobsFlag = "jobs" - JobsFlagShort = "j" - DryRunFlag = "dry-run" + NamespaceFlag = "namespace" + NamespaceFlagShort = "n" + QueueMappingFlag = "queuemapping" + QueueMappingFileFlag = "queuemapping-file" + QueueLabelFlag = "queuelabel" + QPSFlag = "qps" + BurstFlag = "burst" + VerbosityFlag = "verbose" + VerboseFlagShort = "v" + JobsFlag = "jobs" + JobsFlagShort = "j" + DryRunFlag = "dry-run" ) var ( @@ -69,9 +72,10 @@ func setFlags(cmd *cobra.Command) { cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") + cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") - cmd.Flags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent jobs") + cmd.Flags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent import workers") cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") _ = cmd.MarkFlagRequired(QueueLabelFlag) @@ -112,10 +116,30 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) if err != nil { return nil, err } + mapping, err := flags.GetStringToString(QueueMappingFlag) if err != nil { return nil, err } + + mappingFile, err := flags.GetString(QueueMappingFileFlag) + if err != nil { + return nil, err + } + + if mappingFile != "" { + yamlFile, err := os.ReadFile(mappingFile) + if err != nil { + return nil, err + } + extraMapping := map[string]string{} + err = yaml.Unmarshal(yamlFile, extraMapping) + if err != nil { + return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) + } + maps.Copy(mapping, extraMapping) + } + return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping) } diff --git a/go.mod b/go.mod index 97614ebc6f..fd247ccee8 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,8 @@ require ( github.com/prometheus/client_model v0.6.0 github.com/ray-project/kuberay/ray-operator v1.1.0-rc.1 github.com/spf13/cobra v1.8.0 - github.com/spf13/pflag v1.0.5 go.uber.org/zap v1.27.0 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/apiserver v0.29.2 @@ -84,6 +84,7 @@ require ( github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect go.etcd.io/etcd/api/v3 v3.5.11 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.11 // indirect @@ -120,7 +121,6 @@ require ( google.golang.org/protobuf v1.32.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect From 42e1363c8d40faa991bf56cb6f7964556cb82238 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 19 Mar 2024 19:36:56 +0200 Subject: [PATCH 4/8] Add extra labels --- cmd/importer/main.go | 24 +++++++++++++++- cmd/importer/pod/check_test.go | 2 +- cmd/importer/pod/import.go | 7 +++-- cmd/importer/pod/import_test.go | 32 +++++++++++++++++++++- cmd/importer/util/util.go | 4 ++- test/integration/importer/importer_test.go | 2 +- 6 files changed, 64 insertions(+), 7 deletions(-) diff --git a/cmd/importer/main.go b/cmd/importer/main.go index eb76a7823b..640d51795f 100644 --- a/cmd/importer/main.go +++ b/cmd/importer/main.go @@ -18,6 +18,7 @@ package main import ( "context" + "errors" "fmt" "maps" "os" @@ -25,6 +26,7 @@ import ( "github.com/spf13/cobra" "go.uber.org/zap/zapcore" "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/validation" "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -49,6 +51,7 @@ const ( JobsFlag = "jobs" JobsFlagShort = "j" DryRunFlag = "dry-run" + AddLabelsFlag = "add-labels" ) var ( @@ -72,6 +75,7 @@ func setFlags(cmd *cobra.Command) { cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") + cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods") cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") @@ -140,7 +144,25 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) maps.Copy(mapping, extraMapping) } - return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping) + addLabels, err := flags.GetStringToString(AddLabelsFlag) + if err != nil { + return nil, err + } + + var validationErrors []error + for name, value := range addLabels { + for _, err := range validation.IsQualifiedName(name) { + validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err)) + } + for _, err := range validation.IsValidLabelValue(value) { + validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err)) + } + } + if len(validationErrors) > 0 { + return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...)) + } + + return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels) } func getKubeClient(cmd *cobra.Command) (client.Client, error) { diff --git a/cmd/importer/pod/check_test.go b/cmd/importer/pod/check_test.go index 975842e25f..d0e4e39c21 100644 --- a/cmd/importer/pod/check_test.go +++ b/cmd/importer/pod/check_test.go @@ -126,7 +126,7 @@ func TestCheckNamespace(t *testing.T) { client := builder.Build() ctx := context.Background() - mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, nil) gotErr := Check(ctx, client, mpc, 8) if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go index db46d88e58..89d2b8b440 100644 --- a/cmd/importer/pod/import.go +++ b/cmd/importer/pod/import.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "maps" "time" corev1 "k8s.io/api/core/v1" @@ -58,7 +59,7 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs oldLq, found := p.Labels[controllerconstants.QueueLabel] if !found { - if err := addLabels(ctx, c, p, lq.Name); err != nil { + if err := addLabels(ctx, c, p, lq.Name, cache.AddLabels); err != nil { return fmt.Errorf("cannot add queue label: %w", err) } } else if oldLq != lq.Name { @@ -141,9 +142,10 @@ func checkError(err error) (retry, reload bool, timeout time.Duration) { return false, false, 0 } -func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string) error { +func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string, addLabels map[string]string) error { p.Labels[controllerconstants.QueueLabel] = queue p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + maps.Copy(p.Labels, addLabels) err := c.Update(ctx, p) retry, reload, timeouut := checkError(err) @@ -164,6 +166,7 @@ func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string } p.Labels[controllerconstants.QueueLabel] = queue p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue + maps.Copy(p.Labels, addLabels) } err = c.Update(ctx, p) retry, reload, timeouut = checkError(err) diff --git a/cmd/importer/pod/import_test.go b/cmd/importer/pod/import_test.go index 70a09bcee9..64b541b3a8 100644 --- a/cmd/importer/pod/import_test.go +++ b/cmd/importer/pod/import_test.go @@ -87,6 +87,7 @@ func TestImportNamespace(t *testing.T) { clusteQueues []kueue.ClusterQueue localQueues []kueue.LocalQueue mapping map[string]string + addLabels map[string]string wantPods []corev1.Pod wantWorkloads []kueue.Workload @@ -114,6 +115,35 @@ func TestImportNamespace(t *testing.T) { Obj(), }, + wantWorkloads: []kueue.Workload{ + *baseWlWrapper.Clone().Obj(), + }, + }, + "create one, add labels": { + pods: []corev1.Pod{ + *basePodWrapper.Clone().Obj(), + }, + mapping: map[string]string{ + "q1": "lq1", + }, + localQueues: []kueue.LocalQueue{ + *baseLocalQueue.Obj(), + }, + clusteQueues: []kueue.ClusterQueue{ + *baseClusterQueue.Obj(), + }, + addLabels: map[string]string{ + "new.lbl": "val", + }, + + wantPods: []corev1.Pod{ + *basePodWrapper.Clone(). + Label(constants.QueueLabel, "lq1"). + Label(pod.ManagedLabelKey, pod.ManagedLabelValue). + Label("new.lbl", "val"). + Obj(), + }, + wantWorkloads: []kueue.Workload{ *baseWlWrapper.Clone().Obj(), }, @@ -133,7 +163,7 @@ func TestImportNamespace(t *testing.T) { client := builder.Build() ctx := context.Background() - mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping) + mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, tc.addLabels) gotErr := Import(ctx, client, mpc, 8) if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go index c896071495..27248834c3 100644 --- a/cmd/importer/util/util.go +++ b/cmd/importer/util/util.go @@ -63,14 +63,16 @@ type ImportCache struct { ClusterQueues map[string]*kueue.ClusterQueue ResourceFalvors map[string]*kueue.ResourceFlavor PriorityClasses map[string]*schedulingv1.PriorityClass + AddLabels map[string]string } -func LoadImportCache(ctx context.Context, c client.Client, namespaces []string, queueLabel string, mapping map[string]string) (*ImportCache, error) { +func LoadImportCache(ctx context.Context, c client.Client, namespaces []string, queueLabel string, mapping, addLabels map[string]string) (*ImportCache, error) { ret := ImportCache{ Namespaces: slices.Clone(namespaces), QueueLabel: queueLabel, Mapping: maps.Clone(mapping), LocalQueues: make(map[string]map[string]*kueue.LocalQueue), + AddLabels: addLabels, } // get the cluster queues diff --git a/test/integration/importer/importer_test.go b/test/integration/importer/importer_test.go index 544a7fec10..c02028d03e 100644 --- a/test/integration/importer/importer_test.go +++ b/test/integration/importer/importer_test.go @@ -89,7 +89,7 @@ var _ = ginkgo.Describe("Importer", func() { }) ginkgo.By("Running the import", func() { - mapping, err := importerutil.LoadImportCache(ctx, k8sClient, []string{ns.Name}, "src.lbl", map[string]string{"src-val": "lq1"}) + mapping, err := importerutil.LoadImportCache(ctx, k8sClient, []string{ns.Name}, "src.lbl", map[string]string{"src-val": "lq1"}, nil) gomega.Expect(err).ToNot(gomega.HaveOccurred()) gomega.Expect(mapping).ToNot(gomega.BeNil()) From 09d7cec35579da3f9f9420362b00fd101c27c39f Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 19 Mar 2024 20:06:13 +0200 Subject: [PATCH 5/8] Review Remarks --- cmd/importer/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/importer/main.go b/cmd/importer/main.go index 640d51795f..0ebd3420f2 100644 --- a/cmd/importer/main.go +++ b/cmd/importer/main.go @@ -48,8 +48,8 @@ const ( BurstFlag = "burst" VerbosityFlag = "verbose" VerboseFlagShort = "v" - JobsFlag = "jobs" - JobsFlagShort = "j" + ConcurrencyFlag = "concurrent-workers" + ConcurrencyFlagShort = "c" DryRunFlag = "dry-run" AddLabelsFlag = "add-labels" ) @@ -79,7 +79,7 @@ func setFlags(cmd *cobra.Command) { cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") - cmd.Flags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent import workers") + cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers") cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") _ = cmd.MarkFlagRequired(QueueLabelFlag) @@ -198,7 +198,7 @@ func getKubeClient(cmd *cobra.Command) (client.Client, error) { func importCmd(cmd *cobra.Command, _ []string) error { log := ctrl.Log.WithName("import") ctx := ctrl.LoggerInto(context.Background(), log) - jobs, _ := cmd.Flags().GetUint(JobsFlag) + cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag) c, err := getKubeClient(cmd) if err != nil { return err @@ -209,7 +209,7 @@ func importCmd(cmd *cobra.Command, _ []string) error { return err } - if err = pod.Check(ctx, c, cache, jobs); err != nil { + if err = pod.Check(ctx, c, cache, cWorkers); err != nil { return err } @@ -217,5 +217,5 @@ func importCmd(cmd *cobra.Command, _ []string) error { fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) return nil } - return pod.Import(ctx, c, cache, jobs) + return pod.Import(ctx, c, cache, cWorkers) } From 32b896ba75a978ce3a112a8ea93d7d27bd79ce6d Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 20 Mar 2024 09:17:20 +0200 Subject: [PATCH 6/8] Review Remarks --- cmd/importer/README.md | 4 ++- cmd/importer/main.go | 2 +- cmd/importer/pod/check.go | 4 +-- cmd/importer/pod/import.go | 35 +++++++++++----------- cmd/importer/pod/import_test.go | 4 ++- cmd/importer/util/util.go | 12 +++++--- test/integration/importer/importer_test.go | 10 +++++++ 7 files changed, 44 insertions(+), 27 deletions(-) diff --git a/cmd/importer/README.md b/cmd/importer/README.md index 3f0d97c90d..99c207a61f 100644 --- a/cmd/importer/README.md +++ b/cmd/importer/README.md @@ -6,6 +6,8 @@ A tool able to import existing pods into kueue. The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration disabled. +For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. + ## Build From kueue source root run: @@ -37,4 +39,4 @@ After which, if `--dry-run=false` was specified, for each selected Pod the impor ```bash ./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val=user-queue --dry-run=false ``` - Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set. \ No newline at end of file + Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set. diff --git a/cmd/importer/main.go b/cmd/importer/main.go index 0ebd3420f2..ef44dd2258 100644 --- a/cmd/importer/main.go +++ b/cmd/importer/main.go @@ -75,7 +75,7 @@ func setFlags(cmd *cobra.Command) { cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") - cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods") + cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads") cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go index 630cf9d453..a762413df9 100644 --- a/cmd/importer/pod/check.go +++ b/cmd/importer/pod/check.go @@ -65,8 +65,8 @@ func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs u return nil }) - fmt.Printf("Checked %d pods\n", summary.TotalPods) - fmt.Printf("Failed %d pods\n", summary.FailedPods) + log := ctrl.LoggerFrom(ctx) + log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0]) } diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go index 89d2b8b440..3cddcaeee5 100644 --- a/cmd/importer/pod/import.go +++ b/cmd/importer/pod/import.go @@ -73,6 +73,8 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs return fmt.Errorf("construct workload: %w", err) } + maps.Copy(wl.Labels, cache.AddLabels) + if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found { wl.Spec.PriorityClassName = pc.Name wl.Spec.Priority = &pc.Value @@ -122,8 +124,8 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs return admitWorkload(ctx, c, wl) }) - fmt.Printf("Checked %d pods\n", summary.TotalPods) - fmt.Printf("Failed %d pods\n", summary.FailedPods) + log := ctrl.LoggerFrom(ctx) + log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods) } @@ -148,20 +150,20 @@ func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string maps.Copy(p.Labels, addLabels) err := c.Update(ctx, p) - retry, reload, timeouut := checkError(err) + retry, reload, timeout := checkError(err) for retry { - if timeouut >= 0 { + if timeout >= 0 { select { case <-ctx.Done(): return errors.New("context canceled") - case <-time.After(timeouut): + case <-time.After(timeout): } } if reload { err = c.Get(ctx, client.ObjectKeyFromObject(p), p) if err != nil { - retry, reload, timeouut = checkError(err) + retry, reload, timeout = checkError(err) continue } p.Labels[controllerconstants.QueueLabel] = queue @@ -169,7 +171,7 @@ func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string maps.Copy(p.Labels, addLabels) } err = c.Update(ctx, p) - retry, reload, timeouut = checkError(err) + retry, reload, timeout = checkError(err) } return err } @@ -179,37 +181,34 @@ func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) er if apierrors.IsAlreadyExists(err) { return nil } - retry, _, timeouut := checkError(err) + retry, _, timeout := checkError(err) for retry { - if timeouut >= 0 { + if timeout >= 0 { select { case <-ctx.Done(): return errors.New("context canceled") - case <-time.After(timeouut): + case <-time.After(timeout): } } err = c.Create(ctx, wl) - retry, _, timeouut = checkError(err) + retry, _, timeout = checkError(err) } return err } func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error { err := workload.ApplyAdmissionStatus(ctx, c, wl, false) - if apierrors.IsAlreadyExists(err) { - return nil - } - retry, _, timeouut := checkError(err) + retry, _, timeout := checkError(err) for retry { - if timeouut >= 0 { + if timeout >= 0 { select { case <-ctx.Done(): return errors.New("context canceled") - case <-time.After(timeouut): + case <-time.After(timeout): } } err = workload.ApplyAdmissionStatus(ctx, c, wl, false) - retry, _, timeouut = checkError(err) + retry, _, timeout = checkError(err) } return err } diff --git a/cmd/importer/pod/import_test.go b/cmd/importer/pod/import_test.go index 64b541b3a8..e16970fb21 100644 --- a/cmd/importer/pod/import_test.go +++ b/cmd/importer/pod/import_test.go @@ -145,7 +145,9 @@ func TestImportNamespace(t *testing.T) { }, wantWorkloads: []kueue.Workload{ - *baseWlWrapper.Clone().Obj(), + *baseWlWrapper.Clone(). + Label("new.lbl", "val"). + Obj(), }, }, } diff --git a/cmd/importer/util/util.go b/cmd/importer/util/util.go index 27248834c3..ffb4472a4e 100644 --- a/cmd/importer/util/util.go +++ b/cmd/importer/util/util.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,7 +46,7 @@ var ( ErrCQInvalid = errors.New("clusterqueue invalid") ) -func ListOptions(namespace, queueLabel, continueToken string) []client.ListOption { +func listOptions(namespace, queueLabel, continueToken string) []client.ListOption { opts := []client.ListOption{ client.InNamespace(namespace), client.HasLabels{queueLabel}, @@ -152,15 +153,18 @@ func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLa defer log.V(3).Info("End pods list") page := 0 for { - err := c.List(ctx, lst, ListOptions(ns, queueLabel, lst.Continue)...) + err := c.List(ctx, lst, listOptions(ns, queueLabel, lst.Continue)...) if err != nil { log.Error(err, "list") return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err) } for _, p := range lst.Items { - // ignore deleted or in final Phase? - ch <- p + if p.Status.Phase == corev1.PodFailed || p.Status.Phase == corev1.PodSucceeded { + log.V(2).Info("Skip pod", "pod", klog.KObj(&p), "phase", p.Status.Phase) + } else { + ch <- p + } } page++ diff --git a/test/integration/importer/importer_test.go b/test/integration/importer/importer_test.go index c02028d03e..1f0a9c9bab 100644 --- a/test/integration/importer/importer_test.go +++ b/test/integration/importer/importer_test.go @@ -109,6 +109,9 @@ var _ = ginkgo.Describe("Importer", func() { util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2) }) + wl1UID := wl1.UID + wl2UID := wl2.UID + ginkgo.By("Starting kueue, the cluster queue status should account for the imported Workloads", func() { fwk.StartManager(ctx, cfg, managerAndSchedulerSetup) @@ -149,6 +152,13 @@ var _ = ginkgo.Describe("Importer", func() { util.ExpectWorkloadToFinish(ctx, k8sClient, wl2LookupKey) util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl3) }) + + ginkgo.By("Checking the imported Workloads are not recreated", func() { + gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed()) + gomega.Expect(wl1.UID).To(gomega.Equal(wl1UID)) + gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed()) + gomega.Expect(wl2.UID).To(gomega.Equal(wl2UID)) + }) }) }) From 37dd2ce285a71966fbfe29eaa3a8689f436964fc Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 20 Mar 2024 16:23:46 +0200 Subject: [PATCH 7/8] Review Remarks --- cmd/importer/README.md | 10 +++++----- cmd/importer/pod/check.go | 2 +- cmd/importer/pod/import.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/importer/README.md b/cmd/importer/README.md index 99c207a61f..5667392e5e 100644 --- a/cmd/importer/README.md +++ b/cmd/importer/README.md @@ -4,7 +4,7 @@ A tool able to import existing pods into kueue. ## Cluster setup. -The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration disabled. +The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details. For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. @@ -20,11 +20,11 @@ go build -C cmd/importer/ -o $(pwd)/bin/importer The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. -The will perform following checks: +The importer will perform following checks: - At least one `namespace` is provided. - The label key (`queuelabel`) providing the queue mapping is provided. -- A mapping from ane of the encountered `queuelabel` values to an existing LocalQueue exists. +- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists. - The LocalQueues involved in the import are using an existing ClusterQueue. - The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. @@ -37,6 +37,6 @@ After which, if `--dry-run=false` was specified, for each selected Pod the impor ### Example ```bash -./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val=user-queue --dry-run=false +./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false ``` - Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set. + Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value. diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go index a762413df9..33b2e93fac 100644 --- a/cmd/importer/pod/check.go +++ b/cmd/importer/pod/check.go @@ -68,7 +68,7 @@ func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs u log := ctrl.LoggerFrom(ctx) log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { - fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0]) + log.Info("Error", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) } return errors.Join(summary.Errors...) } diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go index 3cddcaeee5..2ac170f76c 100644 --- a/cmd/importer/pod/import.go +++ b/cmd/importer/pod/import.go @@ -127,7 +127,7 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs log := ctrl.LoggerFrom(ctx) log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { - fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods) + log.Info("Error", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) } return errors.Join(summary.Errors...) } From e6e41fbf12a9c19a6ce0a394c3e03f5bab179803 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 20 Mar 2024 17:46:05 +0200 Subject: [PATCH 8/8] Review Remarks --- cmd/importer/pod/check.go | 13 ++++++------- cmd/importer/pod/import.go | 8 ++++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/cmd/importer/pod/check.go b/cmd/importer/pod/check.go index 33b2e93fac..e782d413bd 100644 --- a/cmd/importer/pod/check.go +++ b/cmd/importer/pod/check.go @@ -54,21 +54,20 @@ func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs u return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) } - rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) - if _, found := cache.ResourceFalvors[rf]; !found { - return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid) + rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) + rf, rfFound := cache.ResourceFalvors[rfName] + if !rfFound { + return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid) } - // do some additional checks like: - // - (maybe) the resources managed by the queues - + log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf)) return nil }) log := ctrl.LoggerFrom(ctx) log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { - log.Info("Error", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) + log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) } return errors.Join(summary.Errors...) } diff --git a/cmd/importer/pod/import.go b/cmd/importer/pod/import.go index 2ac170f76c..06f5f73f7c 100644 --- a/cmd/importer/pod/import.go +++ b/cmd/importer/pod/import.go @@ -121,13 +121,17 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs Message: fmt.Sprintf("Imported into ClusterQueue %s", cq.Name), } apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond) - return admitWorkload(ctx, c, wl) + if err := admitWorkload(ctx, c, wl); err != nil { + return err + } + log.V(2).Info("Successfully imported", "pod", klog.KObj(p), "workload", klog.KObj(wl)) + return nil }) log := ctrl.LoggerFrom(ctx) log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods) for e, pods := range summary.ErrorsForPods { - log.Info("Error", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) + log.Info("Import failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) } return errors.Join(summary.Errors...) }