Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[importer] Extend mapping capabilities. #1878

Merged
merged 3 commits into from Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 59 additions & 2 deletions cmd/importer/README.md
Expand Up @@ -20,14 +20,47 @@ go build -C cmd/importer/ -o $(pwd)/bin/importer

The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters.

### Check
The importer will perform following checks:

- At least one `namespace` is provided.
- The label key (`queuelabel`) providing the queue mapping is provided.
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists.
- For every Pod a mapping to a LocalQueue is available.
- The target LocalQueue exists.
- The LocalQueues involved in the import are using an existing ClusterQueue.
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads.

The are two ways the mapping from a pod to a LocalQueue can be specified:

#### Simple mapping

It's done by specifying a label name and any number of <label-value>=<localQueue-name> a command line arguments eg. `--queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2`.

#### Advanced mapping

It's done providing an yaml mapping file name as `--queuemapping-file` argument, it's expected content being:

```yaml
- match:
labels:
src.lbl: src-val
toLocalQueue: user-queue
- match:
priorityClassName: p-class
labels:
src.lbl: src-val2
src2.lbl: src2-val
toLocalQueue: user-queue2
- match:
labels:
src.lbl: src-val3
skip: true
```

- During the mapping, if the match rule has no `priorityClassName` the `priorityClassName` of the pod will be ignored, if more than one `label: value` pairs are provided, all of them should match.
- The rules are evaluated in order.
- `skip: true` can be used to ignore the pods matching a rule.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could the user leave a rule with no priorityClassName and no labels with skip:true?

That could be a way of saying: if it doesn't match any rule above, I just don't care about this pod.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,

-skip: true

at the end is doing that.

similarly

-toLocalQueue: user-queue2

will put all the pods in one localQueue


### Import
After which, if `--dry-run=false` was specified, for each selected Pod the importer will:

- Update the Pod's Kueue related labels.
Expand All @@ -36,7 +69,31 @@ After which, if `--dry-run=false` was specified, for each selected Pod the impor

### Example

#### Simple mapping

```bash
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false
```
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value.

#### Advanced mapping
With mapping file:
```yaml
- match:
labels:
src.lbl: src-val
toLocalQueue: user-queue
- match:
priorityClassName: p-class
labels:
src.lbl: src-val2
src2.lbl: src2-val
toLocalQueue: user-queue2
```

```bash
./bin/importer import -n ns1,ns2 --queuemapping-file=<mapping-file-path> --dry-run=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could script return done or smth if the import finished successfully?

```

Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set to `src-val` in LocalQueue `user-queue` regardless of their priorityClassName and those with `src.lbl==src-val2` ,`src2.lbl==src2-val` and `priorityClassName==p-class`in `user-queue2`.

25 changes: 11 additions & 14 deletions cmd/importer/main.go
Expand Up @@ -20,12 +20,10 @@ import (
"context"
"errors"
"fmt"
"maps"
"os"

"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -82,8 +80,10 @@ func setFlags(cmd *cobra.Command) {
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers")
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only")

_ = cmd.MarkFlagRequired(QueueLabelFlag)
_ = cmd.MarkFlagRequired(NamespaceFlag)
cmd.MarkFlagsRequiredTogether(QueueLabelFlag, QueueMappingFlag)
cmd.MarkFlagsOneRequired(QueueLabelFlag, QueueMappingFileFlag)
cmd.MarkFlagsMutuallyExclusive(QueueLabelFlag, QueueMappingFileFlag)
}

func init() {
Expand Down Expand Up @@ -116,12 +116,8 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}

mapping, err := flags.GetStringToString(QueueMappingFlag)
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}
Expand All @@ -131,17 +127,18 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
return nil, err
}

var mapping util.MappingRules
if mappingFile != "" {
yamlFile, err := os.ReadFile(mappingFile)
mapping, err = util.MappingRulesFromFile(mappingFile)
if err != nil {
return nil, err
}
extraMapping := map[string]string{}
err = yaml.Unmarshal(yamlFile, extraMapping)
} else {
queueLabelMapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err)
return nil, err
}
maps.Copy(mapping, extraMapping)
mapping = util.MappingRulesForLabel(queueLabel, queueLabelMapping)
}

addLabels, err := flags.GetStringToString(AddLabelsFlag)
Expand All @@ -162,7 +159,7 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...))
}

return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels)
return util.LoadImportCache(ctx, c, namespaces, mapping, addLabels)
}

func getKubeClient(cmd *cobra.Command) (client.Client, error) {
Expand Down
31 changes: 20 additions & 11 deletions cmd/importer/pod/check.go
Expand Up @@ -32,40 +32,49 @@ import (
func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
err := util.PushPods(ctx, c, cache.Namespaces, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := cache.ClusterQueue(p)
if err != nil {
return err
cq, skip, err := cache.ClusterQueue(p)
if skip || err != nil {
return skip, err
}

if len(cq.Spec.ResourceGroups) == 0 {
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
return false, fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
}

if len(cq.Spec.ResourceGroups[0].Flavors) == 0 {
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
return false, fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
}

rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name)
rf, rfFound := cache.ResourceFalvors[rfName]
if !rfFound {
return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid)
return false, fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid)
}

var pv int32
if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found {
pv = pc.Value
} else {
if p.Spec.PriorityClassName != "" {
return false, fmt.Errorf("%q: %w", p.Spec.PriorityClassName, util.ErrPCNotFound)
}
}
trasc marked this conversation as resolved.
Show resolved Hide resolved

log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf))
return nil
log.V(2).Info("Successfully checked", "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf), "priority", pv)
trasc marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods)
log.Info("Check done", "checked", summary.TotalPods, "skipped", summary.SkippedPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0])
}
Expand Down
52 changes: 42 additions & 10 deletions cmd/importer/pod/check_test.go
Expand Up @@ -46,7 +46,7 @@ func TestCheckNamespace(t *testing.T) {
pods []corev1.Pod
clusteQueues []kueue.ClusterQueue
localQueues []kueue.LocalQueue
mapping map[string]string
mapping util.MappingRules
flavors []kueue.ResourceFlavor

wantError error
Expand All @@ -62,17 +62,33 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
wantError: util.ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -83,8 +99,16 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -98,8 +122,16 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -126,7 +158,7 @@ func TestCheckNamespace(t *testing.T) {
client := builder.Build()
ctx := context.Background()

mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, nil)
mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, tc.mapping, nil)
gotErr := Check(ctx, client, mpc, 8)

if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
Expand Down
25 changes: 12 additions & 13 deletions cmd/importer/pod/import.go
Expand Up @@ -43,34 +43,34 @@ import (
func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
err := util.PushPods(ctx, c, cache.Namespaces, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Importing")

lq, err := cache.LocalQueue(p)
if err != nil {
return err
lq, skip, err := cache.LocalQueue(p)
if skip || err != nil {
return skip, err
}

oldLq, found := p.Labels[controllerconstants.QueueLabel]
if !found {
if err := addLabels(ctx, c, p, lq.Name, cache.AddLabels); err != nil {
return fmt.Errorf("cannot add queue label: %w", err)
return false, fmt.Errorf("cannot add queue label: %w", err)
}
} else if oldLq != lq.Name {
return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name)
return false, fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name)
}

kp := pod.FromObject(p)
// Note: the recorder is not used for single pods, we can just pass nil for now.
wl, err := kp.ConstructComposableWorkload(ctx, c, nil)
if err != nil {
return fmt.Errorf("construct workload: %w", err)
return false, fmt.Errorf("construct workload: %w", err)
}

maps.Copy(wl.Labels, cache.AddLabels)
Expand All @@ -82,12 +82,11 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
}

if err := createWorkload(ctx, c, wl); err != nil {
return fmt.Errorf("creating workload: %w", err)
return false, fmt.Errorf("creating workload: %w", err)

}

//make its admission and update its status

info := workload.NewInfo(wl)
cq := cache.ClusterQueues[string(lq.Spec.ClusterQueue)]
admission := kueue.Admission{
Expand Down Expand Up @@ -122,14 +121,14 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
}
apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond)
if err := admitWorkload(ctx, c, wl); err != nil {
return err
return false, err
}
log.V(2).Info("Successfully imported", "pod", klog.KObj(p), "workload", klog.KObj(wl))
return nil
return false, nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods)
log.Info("Import done", "checked", summary.TotalPods, "skipped", summary.SkippedPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
log.Info("Import failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0])
}
Expand Down