New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pod importer #1825
Add pod importer #1825
Changes from 7 commits
87a4306
4525213
0e0b365
42e1363
09d7cec
32b896b
37dd2ce
e6e41fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Kueue Importer Tool | ||
|
||
A tool able to import existing pods into kueue. | ||
|
||
## Cluster setup. | ||
|
||
The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details. | ||
|
||
For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. | ||
|
||
## Build | ||
|
||
From kueue source root run: | ||
```bash | ||
go build -C cmd/importer/ -o $(pwd)/bin/importer | ||
|
||
``` | ||
|
||
## Usage | ||
|
||
The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. | ||
|
||
The importer will perform following checks: | ||
|
||
- At least one `namespace` is provided. | ||
- The label key (`queuelabel`) providing the queue mapping is provided. | ||
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists. | ||
- The LocalQueues involved in the import are using an existing ClusterQueue. | ||
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. | ||
|
||
After which, if `--dry-run=false` was specified, for each selected Pod the importer will: | ||
trasc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- Update the Pod's Kueue related labels. | ||
- Create a Workload associated with the Pod. | ||
- Admit the Workload. | ||
|
||
### Example | ||
|
||
```bash | ||
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false | ||
``` | ||
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"maps" | ||
"os" | ||
|
||
"github.com/spf13/cobra" | ||
"go.uber.org/zap/zapcore" | ||
"gopkg.in/yaml.v2" | ||
"k8s.io/apimachinery/pkg/util/validation" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
|
||
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" | ||
"sigs.k8s.io/kueue/cmd/importer/pod" | ||
"sigs.k8s.io/kueue/cmd/importer/util" | ||
"sigs.k8s.io/kueue/pkg/util/useragent" | ||
) | ||
|
||
const ( | ||
NamespaceFlag = "namespace" | ||
NamespaceFlagShort = "n" | ||
QueueMappingFlag = "queuemapping" | ||
QueueMappingFileFlag = "queuemapping-file" | ||
QueueLabelFlag = "queuelabel" | ||
QPSFlag = "qps" | ||
BurstFlag = "burst" | ||
VerbosityFlag = "verbose" | ||
VerboseFlagShort = "v" | ||
ConcurrencyFlag = "concurrent-workers" | ||
ConcurrencyFlagShort = "c" | ||
DryRunFlag = "dry-run" | ||
AddLabelsFlag = "add-labels" | ||
) | ||
|
||
var ( | ||
rootCmd = &cobra.Command{ | ||
Use: "importer", | ||
Short: "Import existing (running) objects into Kueue", | ||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { | ||
v, _ := cmd.Flags().GetCount(VerbosityFlag) | ||
alculquicondor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
level := (v + 1) * -1 | ||
ctrl.SetLogger(zap.New( | ||
zap.UseDevMode(true), | ||
zap.ConsoleEncoder(), | ||
zap.Level(zapcore.Level(level)), | ||
)) | ||
return nil | ||
}, | ||
} | ||
) | ||
|
||
func setFlags(cmd *cobra.Command) { | ||
trasc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") | ||
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") | ||
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads") | ||
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers") | ||
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") | ||
|
||
_ = cmd.MarkFlagRequired(QueueLabelFlag) | ||
_ = cmd.MarkFlagRequired(NamespaceFlag) | ||
} | ||
|
||
func init() { | ||
rootCmd.AddGroup(&cobra.Group{ | ||
ID: "pod", | ||
Title: "Pods import", | ||
}) | ||
rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") | ||
|
||
importCmd := &cobra.Command{ | ||
Use: "import", | ||
GroupID: "pod", | ||
Short: "Checks the prerequisites and import pods.", | ||
RunE: importCmd, | ||
} | ||
setFlags(importCmd) | ||
rootCmd.AddCommand(importCmd) | ||
} | ||
|
||
func main() { | ||
err := rootCmd.Execute() | ||
if err != nil { | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) { | ||
flags := cmd.Flags() | ||
namespaces, err := flags.GetStringSlice(NamespaceFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
queueLabel, err := flags.GetString(QueueLabelFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mapping, err := flags.GetStringToString(QueueMappingFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mappingFile, err := flags.GetString(QueueMappingFileFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if mappingFile != "" { | ||
yamlFile, err := os.ReadFile(mappingFile) | ||
if err != nil { | ||
return nil, err | ||
} | ||
extraMapping := map[string]string{} | ||
err = yaml.Unmarshal(yamlFile, extraMapping) | ||
if err != nil { | ||
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) | ||
} | ||
maps.Copy(mapping, extraMapping) | ||
} | ||
|
||
addLabels, err := flags.GetStringToString(AddLabelsFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var validationErrors []error | ||
for name, value := range addLabels { | ||
for _, err := range validation.IsQualifiedName(name) { | ||
validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err)) | ||
} | ||
for _, err := range validation.IsValidLabelValue(value) { | ||
validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err)) | ||
} | ||
} | ||
if len(validationErrors) > 0 { | ||
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...)) | ||
} | ||
|
||
return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels) | ||
} | ||
|
||
func getKubeClient(cmd *cobra.Command) (client.Client, error) { | ||
kubeConfig, err := ctrl.GetConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if kubeConfig.UserAgent == "" { | ||
kubeConfig.UserAgent = useragent.Default() | ||
} | ||
qps, err := cmd.Flags().GetFloat32(QPSFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.QPS = qps | ||
bust, err := cmd.Flags().GetInt(BurstFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.Burst = bust | ||
|
||
if err := kueue.AddToScheme(scheme.Scheme); err != nil { | ||
return nil, err | ||
} | ||
|
||
c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return c, nil | ||
} | ||
|
||
func importCmd(cmd *cobra.Command, _ []string) error { | ||
log := ctrl.Log.WithName("import") | ||
ctx := ctrl.LoggerInto(context.Background(), log) | ||
cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag) | ||
c, err := getKubeClient(cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cache, err := loadMappingCache(ctx, c, cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err = pod.Check(ctx, c, cache, cWorkers); err != nil { | ||
return err | ||
} | ||
|
||
if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr { | ||
fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) | ||
return nil | ||
} | ||
return pod.Import(ctx, c, cache, cWorkers) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,74 @@ | ||||||
/* | ||||||
Copyright 2024 The Kubernetes Authors. | ||||||
|
||||||
Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
you may not use this file except in compliance with the License. | ||||||
You may obtain a copy of the License at | ||||||
|
||||||
http://www.apache.org/licenses/LICENSE-2.0 | ||||||
|
||||||
Unless required by applicable law or agreed to in writing, software | ||||||
distributed under the License is distributed on an "AS IS" BASIS, | ||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
See the License for the specific language governing permissions and | ||||||
limitations under the License. | ||||||
*/ | ||||||
|
||||||
package pod | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"errors" | ||||||
"fmt" | ||||||
|
||||||
corev1 "k8s.io/api/core/v1" | ||||||
"k8s.io/klog/v2" | ||||||
ctrl "sigs.k8s.io/controller-runtime" | ||||||
"sigs.k8s.io/controller-runtime/pkg/client" | ||||||
|
||||||
"sigs.k8s.io/kueue/cmd/importer/util" | ||||||
) | ||||||
|
||||||
func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { | ||||||
ch := make(chan corev1.Pod) | ||||||
go func() { | ||||||
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) | ||||||
if err != nil { | ||||||
ctrl.LoggerFrom(ctx).Error(err, "Listing pods") | ||||||
} | ||||||
}() | ||||||
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { | ||||||
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) | ||||||
log.V(3).Info("Checking") | ||||||
|
||||||
cq, err := cache.ClusterQueue(p) | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
if len(cq.Spec.ResourceGroups) == 0 { | ||||||
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) | ||||||
} | ||||||
|
||||||
if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { | ||||||
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) | ||||||
} | ||||||
|
||||||
rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) | ||||||
if _, found := cache.ResourceFalvors[rf]; !found { | ||||||
return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid) | ||||||
} | ||||||
|
||||||
// do some additional checks like: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's enhance the dry run: Let's produce a list of the assignments to be done for each pod (local queue name, cluster queue name, flavor). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As V(2) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Someting like "log.Info("Check succeeded" , "pod", kref.., "lq", LocalQueue ....)"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe more like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed |
||||||
// - (maybe) the resources managed by the queues | ||||||
|
||||||
return nil | ||||||
}) | ||||||
|
||||||
log := ctrl.LoggerFrom(ctx) | ||||||
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) | ||||||
for e, pods := range summary.ErrorsForPods { | ||||||
log.Info("Error", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
} | ||||||
return errors.Join(summary.Errors...) | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why at least one? What will happen if it uses non-existing ResourceFlavors?
Probably we should at least log warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the first resource flavor is not created, the check will fail.