generated from kubernetes/kubernetes-template-project
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pod importer #1825
Merged
Merged
Add pod importer #1825
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
87a4306
[pod] Export `FromObject()`
trasc 4525213
[cmd/importer] Initial implementation.
trasc 0e0b365
Review Remarks
trasc 42e1363
Add extra labels
trasc 09d7cec
Review Remarks
trasc 32b896b
Review Remarks
trasc 37dd2ce
Review Remarks
trasc e6e41fb
Review Remarks
trasc File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Kueue Importer Tool | ||
|
||
A tool able to import existing pods into kueue. | ||
|
||
## Cluster setup. | ||
|
||
The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration framework disabled. Check Kueue's [installation guide](https://kueue.sigs.k8s.io/docs/installation/) and [Run Plain Pods](https://kueue.sigs.k8s.io/docs/tasks/run_plain_pods/#before-you-begin) for details. | ||
|
||
For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects. | ||
|
||
## Build | ||
|
||
From kueue source root run: | ||
```bash | ||
go build -C cmd/importer/ -o $(pwd)/bin/importer | ||
|
||
``` | ||
|
||
## Usage | ||
|
||
The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters. | ||
|
||
The importer will perform following checks: | ||
|
||
- At least one `namespace` is provided. | ||
- The label key (`queuelabel`) providing the queue mapping is provided. | ||
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists. | ||
- The LocalQueues involved in the import are using an existing ClusterQueue. | ||
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads. | ||
|
||
After which, if `--dry-run=false` was specified, for each selected Pod the importer will: | ||
trasc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
- Update the Pod's Kueue related labels. | ||
- Create a Workload associated with the Pod. | ||
- Admit the Workload. | ||
|
||
### Example | ||
|
||
```bash | ||
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false | ||
``` | ||
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"maps" | ||
"os" | ||
|
||
"github.com/spf13/cobra" | ||
"go.uber.org/zap/zapcore" | ||
"gopkg.in/yaml.v2" | ||
"k8s.io/apimachinery/pkg/util/validation" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
|
||
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" | ||
"sigs.k8s.io/kueue/cmd/importer/pod" | ||
"sigs.k8s.io/kueue/cmd/importer/util" | ||
"sigs.k8s.io/kueue/pkg/util/useragent" | ||
) | ||
|
||
const ( | ||
NamespaceFlag = "namespace" | ||
NamespaceFlagShort = "n" | ||
QueueMappingFlag = "queuemapping" | ||
QueueMappingFileFlag = "queuemapping-file" | ||
QueueLabelFlag = "queuelabel" | ||
QPSFlag = "qps" | ||
BurstFlag = "burst" | ||
VerbosityFlag = "verbose" | ||
VerboseFlagShort = "v" | ||
ConcurrencyFlag = "concurrent-workers" | ||
ConcurrencyFlagShort = "c" | ||
DryRunFlag = "dry-run" | ||
AddLabelsFlag = "add-labels" | ||
) | ||
|
||
var ( | ||
rootCmd = &cobra.Command{ | ||
Use: "importer", | ||
Short: "Import existing (running) objects into Kueue", | ||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error { | ||
v, _ := cmd.Flags().GetCount(VerbosityFlag) | ||
alculquicondor marked this conversation as resolved.
Show resolved
Hide resolved
|
||
level := (v + 1) * -1 | ||
ctrl.SetLogger(zap.New( | ||
zap.UseDevMode(true), | ||
zap.ConsoleEncoder(), | ||
zap.Level(zapcore.Level(level)), | ||
)) | ||
return nil | ||
}, | ||
} | ||
) | ||
|
||
func setFlags(cmd *cobra.Command) { | ||
trasc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)") | ||
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue") | ||
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads") | ||
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names") | ||
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit") | ||
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers") | ||
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only") | ||
|
||
_ = cmd.MarkFlagRequired(QueueLabelFlag) | ||
_ = cmd.MarkFlagRequired(NamespaceFlag) | ||
} | ||
|
||
func init() { | ||
rootCmd.AddGroup(&cobra.Group{ | ||
ID: "pod", | ||
Title: "Pods import", | ||
}) | ||
rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)") | ||
|
||
importCmd := &cobra.Command{ | ||
Use: "import", | ||
GroupID: "pod", | ||
Short: "Checks the prerequisites and import pods.", | ||
RunE: importCmd, | ||
} | ||
setFlags(importCmd) | ||
rootCmd.AddCommand(importCmd) | ||
} | ||
|
||
func main() { | ||
err := rootCmd.Execute() | ||
if err != nil { | ||
os.Exit(1) | ||
} | ||
} | ||
|
||
func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command) (*util.ImportCache, error) { | ||
flags := cmd.Flags() | ||
namespaces, err := flags.GetStringSlice(NamespaceFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
queueLabel, err := flags.GetString(QueueLabelFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mapping, err := flags.GetStringToString(QueueMappingFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mappingFile, err := flags.GetString(QueueMappingFileFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if mappingFile != "" { | ||
yamlFile, err := os.ReadFile(mappingFile) | ||
if err != nil { | ||
return nil, err | ||
} | ||
extraMapping := map[string]string{} | ||
err = yaml.Unmarshal(yamlFile, extraMapping) | ||
if err != nil { | ||
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err) | ||
} | ||
maps.Copy(mapping, extraMapping) | ||
} | ||
|
||
addLabels, err := flags.GetStringToString(AddLabelsFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var validationErrors []error | ||
for name, value := range addLabels { | ||
for _, err := range validation.IsQualifiedName(name) { | ||
validationErrors = append(validationErrors, fmt.Errorf("name %q: %s", name, err)) | ||
} | ||
for _, err := range validation.IsValidLabelValue(value) { | ||
validationErrors = append(validationErrors, fmt.Errorf("label %q value %q: %s", name, value, err)) | ||
} | ||
} | ||
if len(validationErrors) > 0 { | ||
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...)) | ||
} | ||
|
||
return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels) | ||
} | ||
|
||
func getKubeClient(cmd *cobra.Command) (client.Client, error) { | ||
kubeConfig, err := ctrl.GetConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
if kubeConfig.UserAgent == "" { | ||
kubeConfig.UserAgent = useragent.Default() | ||
} | ||
qps, err := cmd.Flags().GetFloat32(QPSFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.QPS = qps | ||
bust, err := cmd.Flags().GetInt(BurstFlag) | ||
if err != nil { | ||
return nil, err | ||
} | ||
kubeConfig.Burst = bust | ||
|
||
if err := kueue.AddToScheme(scheme.Scheme); err != nil { | ||
return nil, err | ||
} | ||
|
||
c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return c, nil | ||
} | ||
|
||
func importCmd(cmd *cobra.Command, _ []string) error { | ||
log := ctrl.Log.WithName("import") | ||
ctx := ctrl.LoggerInto(context.Background(), log) | ||
cWorkers, _ := cmd.Flags().GetUint(ConcurrencyFlag) | ||
c, err := getKubeClient(cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cache, err := loadMappingCache(ctx, c, cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err = pod.Check(ctx, c, cache, cWorkers); err != nil { | ||
return err | ||
} | ||
|
||
if dr, _ := cmd.Flags().GetBool(DryRunFlag); dr { | ||
fmt.Printf("%q is enabled by default, use \"--%s=false\" to continue with the import\n", DryRunFlag, DryRunFlag) | ||
return nil | ||
} | ||
return pod.Import(ctx, c, cache, cWorkers) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
Copyright 2024 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package pod | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
|
||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/klog/v2" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
"sigs.k8s.io/kueue/cmd/importer/util" | ||
) | ||
|
||
func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error { | ||
ch := make(chan corev1.Pod) | ||
go func() { | ||
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch) | ||
if err != nil { | ||
ctrl.LoggerFrom(ctx).Error(err, "Listing pods") | ||
} | ||
}() | ||
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error { | ||
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p)) | ||
log.V(3).Info("Checking") | ||
|
||
cq, err := cache.ClusterQueue(p) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if len(cq.Spec.ResourceGroups) == 0 { | ||
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid) | ||
} | ||
|
||
if len(cq.Spec.ResourceGroups[0].Flavors) == 0 { | ||
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid) | ||
} | ||
|
||
rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name) | ||
rf, rfFound := cache.ResourceFalvors[rfName] | ||
if !rfFound { | ||
return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid) | ||
} | ||
|
||
log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf)) | ||
return nil | ||
}) | ||
|
||
log := ctrl.LoggerFrom(ctx) | ||
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods) | ||
for e, pods := range summary.ErrorsForPods { | ||
log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0]) | ||
} | ||
return errors.Join(summary.Errors...) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why at least one? What will happen if it uses non-existing ResourceFlavors?
Probably we should at least log warning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the first resource flavor is not created, the check will fail.