Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pod importer #1825

Merged
merged 8 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 42 additions & 0 deletions cmd/importer/README.md
@@ -0,0 +1,42 @@
# Kueue Importer Tool

A tool able to import existing pods into kueue.

## Cluster setup.

The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration disabled.
trasc marked this conversation as resolved.
Show resolved Hide resolved
trasc marked this conversation as resolved.
Show resolved Hide resolved

For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects.

## Build

From kueue source root run:
```bash
go build -C cmd/importer/ -o $(pwd)/bin/importer

```

## Usage

The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters.

The will perform following checks:
trasc marked this conversation as resolved.
Show resolved Hide resolved

- At least one `namespace` is provided.
- The label key (`queuelabel`) providing the queue mapping is provided.
- A mapping from ane of the encountered `queuelabel` values to an existing LocalQueue exists.
trasc marked this conversation as resolved.
Show resolved Hide resolved
- The LocalQueues involved in the import are using an existing ClusterQueue.
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why at least one? What will happen if it uses non-existing ResourceFlavors?
Probably we should at least log warning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the first resource flavor is not created, the check will fail.


After which, if `--dry-run=false` was specified, for each selected Pod the importer will:
trasc marked this conversation as resolved.
Show resolved Hide resolved

- Update the Pod's Kueue related labels.
- Create a Workload associated with the Pod.
- Admit the Workload.

### Example

```bash
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val=user-queue --dry-run=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why queuemapping has two identical parameters with src-val label, which was not specified in --queuelabel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why queuemapping has two identical parameters with src-val label

fixed

```
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set.
221 changes: 221 additions & 0 deletions cmd/importer/main.go
@@ -0,0 +1,221 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"errors"
"fmt"
"maps"
"os"

"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/pod"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)

const (
NamespaceFlag = "namespace"
NamespaceFlagShort = "n"
QueueMappingFlag = "queuemapping"
QueueMappingFileFlag = "queuemapping-file"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "burst"
VerbosityFlag = "verbose"
VerboseFlagShort = "v"
ConcurrencyFlag = "concurrent-workers"
ConcurrencyFlagShort = "c"
DryRunFlag = "dry-run"
AddLabelsFlag = "add-labels"
)

var (
rootCmd = &cobra.Command{
Use: "importer",
Short: "Import existing (running) objects into Kueue",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
v, _ := cmd.Flags().GetCount(VerbosityFlag)
alculquicondor marked this conversation as resolved.
Show resolved Hide resolved
level := (v + 1) * -1
ctrl.SetLogger(zap.New(
zap.UseDevMode(true),
zap.ConsoleEncoder(),
zap.Level(zapcore.Level(level)),
))
return nil
},
}
)

func setFlags(cmd *cobra.Command) {
trasc marked this conversation as resolved.
Show resolved Hide resolved
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)")
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue")
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads")
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers")
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only")

_ = cmd.MarkFlagRequired(QueueLabelFlag)
_ = cmd.MarkFlagRequired(NamespaceFlag)
}

func init() {
rootCmd.AddGroup(&cobra.Group{
ID: "pod",
Title: "Pods import",
})
rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)")

importCmd := &cobra.Command{
Use: "import",
GroupID: "pod",
Short: "Checks the prerequisites and import pods.",
RunE: importCmd,
}
setFlags(importCmd)
rootCmd.AddCommand(importCmd)
}

func main() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) {
flags := cmd.Flags()
namespaces, err := flags.GetStringSlice(NamespaceFlag)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}

mapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, err
}

mappingFile, err := flags.GetString(QueueMappingFileFlag)
if err != nil {
return nil, err
}

if mappingFile != "" {
yamlFile, err := os.ReadFile(mappingFile)
if err != nil {
return nil, err
}
extraMapping := map[string]string{}
err = yaml.Unmarshal(yamlFile, extraMapping)
if err != nil {
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err)
}
maps.Copy(mapping, extraMapping)
}

addLabels, err := flags.GetStringToString(AddLabelsFlag)
if err != nil {
return nil, err
}

var validationErrors []error
for name, value := range addLabels {
for _, err := range validation.IsQualifiedName(name) {
validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err))
}
for _, err := range validation.IsValidLabelValue(value) {
validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err))
}
}
if len(validationErrors) > 0 {
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...))
}

return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels)
}

func getKubeClient(cmd *cobra.Command) (client.Client, error) {
kubeConfig, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
}
qps, err := cmd.Flags().GetFloat32(QPSFlag)
if err != nil {
return nil, err
}
kubeConfig.QPS = qps
bust, err := cmd.Flags().GetInt(BurstFlag)
if err != nil {
return nil, err
}
kubeConfig.Burst = bust

if err := kueue.AddToScheme(scheme.Scheme); err != nil {
return nil, err
}

c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return nil, err
}
return c, nil
}

func importCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("import")
ctx := ctrl.LoggerInto(context.Background(), log)
cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag)
c, err := getKubeClient(cmd)
if err != nil {
return err
}

cache, err := loadMappingCache(ctx, c, cmd)
if err != nil {
return err
}

if err = pod.Check(ctx, c, cache, cWorkers); err != nil {
return err
}

if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr {
fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag)
return nil
}
return pod.Import(ctx, c, cache, cWorkers)
}
74 changes: 74 additions & 0 deletions cmd/importer/pod/check.go
@@ -0,0 +1,74 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/cmd/importer/util"
)

func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := cache.ClusterQueue(p)
if err != nil {
return err
}

if len(cq.Spec.ResourceGroups) == 0 {
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
}

if len(cq.Spec.ResourceGroups[0].Flavors) == 0 {
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
}

rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name)
if _, found := cache.ResourceFalvors[rf]; !found {
return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid)
}

// do some additional checks like:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's enhance the dry run:

Let's produce a list of the assignments to be done for each pod (local queue name, cluster queue name, flavor).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As V(2)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someting like "log.Info("Check succeeded" , "pod", kref.., "lq", LocalQueue ....)"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more like:
Dry run: Pod to be imported

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

// - (maybe) the resources managed by the queues

return nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0])
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
return errors.Join(summary.Errors...)
}