Skip to content

Commit

Permalink
Add pod importer (kubernetes-sigs#1825)
Browse files Browse the repository at this point in the history
* [pod] Export `FromObject()`

* [cmd/importer] Initial implementation.

* Review Remarks

* Add extra labels

* Review Remarks

* Review Remarks

* Review Remarks

* Review Remarks
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent dfddc64 commit 45458ce
Show file tree
Hide file tree
Showing 14 changed files with 1,403 additions and 16 deletions.
42 changes: 42 additions & 0 deletions cmd/importer/README.md
@@ -0,0 +1,42 @@
# Kueue Importer Tool

A tool able to import existing pods into kueue.

## Cluster setup.

The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details.

For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects.

## Build

From kueue source root run:
```bash
go build -C cmd/importer/ -o $(pwd)/bin/importer

```

## Usage

The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters.

The importer will perform following checks:

- At least one `namespace` is provided.
- The label key (`queuelabel`) providing the queue mapping is provided.
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists.
- The LocalQueues involved in the import are using an existing ClusterQueue.
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads.

After which, if `--dry-run=false` was specified, for each selected Pod the importer will:

- Update the Pod's Kueue related labels.
- Create a Workload associated with the Pod.
- Admit the Workload.

### Example

```bash
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false
```
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value.
221 changes: 221 additions & 0 deletions cmd/importer/main.go
@@ -0,0 +1,221 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"errors"
"fmt"
"maps"
"os"

"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/pod"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)

const (
NamespaceFlag = "namespace"
NamespaceFlagShort = "n"
QueueMappingFlag = "queuemapping"
QueueMappingFileFlag = "queuemapping-file"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "burst"
VerbosityFlag = "verbose"
VerboseFlagShort = "v"
ConcurrencyFlag = "concurrent-workers"
ConcurrencyFlagShort = "c"
DryRunFlag = "dry-run"
AddLabelsFlag = "add-labels"
)

var (
rootCmd = &cobra.Command{
Use: "importer",
Short: "Import existing (running) objects into Kueue",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
v, _ := cmd.Flags().GetCount(VerbosityFlag)
level := (v + 1) * -1
ctrl.SetLogger(zap.New(
zap.UseDevMode(true),
zap.ConsoleEncoder(),
zap.Level(zapcore.Level(level)),
))
return nil
},
}
)

func setFlags(cmd *cobra.Command) {
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)")
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue")
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads")
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers")
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only")

_ = cmd.MarkFlagRequired(QueueLabelFlag)
_ = cmd.MarkFlagRequired(NamespaceFlag)
}

func init() {
rootCmd.AddGroup(&cobra.Group{
ID: "pod",
Title: "Pods import",
})
rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)")

importCmd := &cobra.Command{
Use: "import",
GroupID: "pod",
Short: "Checks the prerequisites and import pods.",
RunE: importCmd,
}
setFlags(importCmd)
rootCmd.AddCommand(importCmd)
}

func main() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) {
flags := cmd.Flags()
namespaces, err := flags.GetStringSlice(NamespaceFlag)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}

mapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, err
}

mappingFile, err := flags.GetString(QueueMappingFileFlag)
if err != nil {
return nil, err
}

if mappingFile != "" {
yamlFile, err := os.ReadFile(mappingFile)
if err != nil {
return nil, err
}
extraMapping := map[string]string{}
err = yaml.Unmarshal(yamlFile, extraMapping)
if err != nil {
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err)
}
maps.Copy(mapping, extraMapping)
}

addLabels, err := flags.GetStringToString(AddLabelsFlag)
if err != nil {
return nil, err
}

var validationErrors []error
for name, value := range addLabels {
for _, err := range validation.IsQualifiedName(name) {
validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err))
}
for _, err := range validation.IsValidLabelValue(value) {
validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err))
}
}
if len(validationErrors) > 0 {
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...))
}

return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels)
}

func getKubeClient(cmd *cobra.Command) (client.Client, error) {
kubeConfig, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
}
qps, err := cmd.Flags().GetFloat32(QPSFlag)
if err != nil {
return nil, err
}
kubeConfig.QPS = qps
bust, err := cmd.Flags().GetInt(BurstFlag)
if err != nil {
return nil, err
}
kubeConfig.Burst = bust

if err := kueue.AddToScheme(scheme.Scheme); err != nil {
return nil, err
}

c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, err
}
return c, nil
}

func importCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("import")
ctx := ctrl.LoggerInto(context.Background(), log)
cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag)
c, err := getKubeClient(cmd)
if err != nil {
return err
}

cache, err := loadMappingCache(ctx, c, cmd)
if err != nil {
return err
}

if err = pod.Check(ctx, c, cache, cWorkers); err != nil {
return err
}

if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr {
fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag)
return nil
}
return pod.Import(ctx, c, cache, cWorkers)
}
73 changes: 73 additions & 0 deletions cmd/importer/pod/check.go
@@ -0,0 +1,73 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/cmd/importer/util"
)

func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := cache.ClusterQueue(p)
if err != nil {
return err
}

if len(cq.Spec.ResourceGroups) == 0 {
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
}

if len(cq.Spec.ResourceGroups[0].Flavors) == 0 {
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
}

rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name)
rf, rfFound := cache.ResourceFalvors[rfName]
if !rfFound {
return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid)
}

log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf))
return nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0])
}
return errors.Join(summary.Errors...)
}

0 comments on commit 45458ce

Please sign in to comment.