Skip to content

Commit

Permalink
[importer] Extend mapping capabilities. (kubernetes-sigs#1878)
Browse files Browse the repository at this point in the history
* [cmd/importer] Advanced mapping.

* [cmd/importer] Add skip.

* Review Remarks
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent 2e5eb74 commit 1fecb12
Show file tree
Hide file tree
Showing 10 changed files with 415 additions and 98 deletions.
61 changes: 59 additions & 2 deletions cmd/importer/README.md
Expand Up @@ -20,14 +20,47 @@ go build -C cmd/importer/ -o $(pwd)/bin/importer

The command runs against the systems default kubectl configuration. Check the [kubectl documentation](https://kubernetes.io/docs/tasks/access-application-cluster/configure-access-multiple-clusters/) to learn more about how to Configure Access to Multiple Clusters.

### Check
The importer will perform following checks:

- At least one `namespace` is provided.
- The label key (`queuelabel`) providing the queue mapping is provided.
- A mapping from one of the encountered `queuelabel` values to an existing LocalQueue exists.
- For every Pod a mapping to a LocalQueue is available.
- The target LocalQueue exists.
- The LocalQueues involved in the import are using an existing ClusterQueue.
- The ClusterQueues involved have at least one ResourceGroup using an existing ResourceFlavor. This ResourceFlavor is used when the importer creates the admission for the created workloads.

The are two ways the mapping from a pod to a LocalQueue can be specified:

#### Simple mapping

It's done by specifying a label name and any number of <label-value>=<localQueue-name> a command line arguments eg. `--queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2`.

#### Advanced mapping

It's done providing an yaml mapping file name as `--queuemapping-file` argument, it's expected content being:

```yaml
- match:
labels:
src.lbl: src-val
toLocalQueue: user-queue
- match:
priorityClassName: p-class
labels:
src.lbl: src-val2
src2.lbl: src2-val
toLocalQueue: user-queue2
- match:
labels:
src.lbl: src-val3
skip: true
```

- During the mapping, if the match rule has no `priorityClassName` the `priorityClassName` of the pod will be ignored, if more than one `label: value` pairs are provided, all of them should match.
- The rules are evaluated in order.
- `skip: true` can be used to ignore the pods matching a rule.

### Import
After which, if `--dry-run=false` was specified, for each selected Pod the importer will:

- Update the Pod's Kueue related labels.
Expand All @@ -36,7 +69,31 @@ After which, if `--dry-run=false` was specified, for each selected Pod the impor

### Example

#### Simple mapping

```bash
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val2=user-queue2 --dry-run=false
```
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set in LocalQueues `user-queue` or `user-queue2` depending on `src.lbl` value.

#### Advanced mapping
With mapping file:
```yaml
- match:
labels:
src.lbl: src-val
toLocalQueue: user-queue
- match:
priorityClassName: p-class
labels:
src.lbl: src-val2
src2.lbl: src2-val
toLocalQueue: user-queue2
```

```bash
./bin/importer import -n ns1,ns2 --queuemapping-file=<mapping-file-path> --dry-run=false
```

Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set to `src-val` in LocalQueue `user-queue` regardless of their priorityClassName and those with `src.lbl==src-val2` ,`src2.lbl==src2-val` and `priorityClassName==p-class`in `user-queue2`.

25 changes: 11 additions & 14 deletions cmd/importer/main.go
Expand Up @@ -20,12 +20,10 @@ import (
"context"
"errors"
"fmt"
"maps"
"os"

"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -82,8 +80,10 @@ func setFlags(cmd *cobra.Command) {
cmd.Flags().UintP(ConcurrencyFlag, ConcurrencyFlagShort, 8, "number of concurrent import workers")
cmd.Flags().Bool(DryRunFlag, true, "don't import, check the config only")

_ = cmd.MarkFlagRequired(QueueLabelFlag)
_ = cmd.MarkFlagRequired(NamespaceFlag)
cmd.MarkFlagsRequiredTogether(QueueLabelFlag, QueueMappingFlag)
cmd.MarkFlagsOneRequired(QueueLabelFlag, QueueMappingFileFlag)
cmd.MarkFlagsMutuallyExclusive(QueueLabelFlag, QueueMappingFileFlag)
}

func init() {
Expand Down Expand Up @@ -116,12 +116,8 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}

mapping, err := flags.GetStringToString(QueueMappingFlag)
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}
Expand All @@ -131,17 +127,18 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
return nil, err
}

var mapping util.MappingRules
if mappingFile != "" {
yamlFile, err := os.ReadFile(mappingFile)
mapping, err = util.MappingRulesFromFile(mappingFile)
if err != nil {
return nil, err
}
extraMapping := map[string]string{}
err = yaml.Unmarshal(yamlFile, extraMapping)
} else {
queueLabelMapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, fmt.Errorf("decoding %q: %w", mappingFile, err)
return nil, err
}
maps.Copy(mapping, extraMapping)
mapping = util.MappingRulesForLabel(queueLabel, queueLabelMapping)
}

addLabels, err := flags.GetStringToString(AddLabelsFlag)
Expand All @@ -162,7 +159,7 @@ func loadMappingCache(ctx context.Context, c client.Client, cmd *cobra.Command)
return nil, fmt.Errorf("%s: %w", AddLabelsFlag, errors.Join(validationErrors...))
}

return util.LoadImportCache(ctx, c, namespaces, queueLabel, mapping, addLabels)
return util.LoadImportCache(ctx, c, namespaces, mapping, addLabels)
}

func getKubeClient(cmd *cobra.Command) (client.Client, error) {
Expand Down
31 changes: 20 additions & 11 deletions cmd/importer/pod/check.go
Expand Up @@ -32,40 +32,49 @@ import (
func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
err := util.PushPods(ctx, c, cache.Namespaces, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := cache.ClusterQueue(p)
if err != nil {
return err
cq, skip, err := cache.ClusterQueue(p)
if skip || err != nil {
return skip, err
}

if len(cq.Spec.ResourceGroups) == 0 {
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
return false, fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
}

if len(cq.Spec.ResourceGroups[0].Flavors) == 0 {
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
return false, fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
}

rfName := string(cq.Spec.ResourceGroups[0].Flavors[0].Name)
rf, rfFound := cache.ResourceFalvors[rfName]
if !rfFound {
return fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid)
return false, fmt.Errorf("%q flavor %q: %w", cq.Name, rfName, util.ErrCQInvalid)
}

var pv int32
if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found {
pv = pc.Value
} else {
if p.Spec.PriorityClassName != "" {
return false, fmt.Errorf("%q: %w", p.Spec.PriorityClassName, util.ErrPCNotFound)
}
}

log.V(2).Info("Successfully checked", "pod", klog.KObj(p), "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf))
return nil
log.V(2).Info("Successfully checked", "clusterQueue", klog.KObj(cq), "resourceFalvor", klog.KObj(rf), "priority", pv)
return false, nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods)
log.Info("Check done", "checked", summary.TotalPods, "skipped", summary.SkippedPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
log.Info("Validation failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0])
}
Expand Down
52 changes: 42 additions & 10 deletions cmd/importer/pod/check_test.go
Expand Up @@ -46,7 +46,7 @@ func TestCheckNamespace(t *testing.T) {
pods []corev1.Pod
clusteQueues []kueue.ClusterQueue
localQueues []kueue.LocalQueue
mapping map[string]string
mapping util.MappingRules
flavors []kueue.ResourceFlavor

wantError error
Expand All @@ -62,17 +62,33 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
wantError: util.ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -83,8 +99,16 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -98,8 +122,16 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
mapping: util.MappingRules{
util.MappingRule{
Match: util.MappingMatch{
PriorityClassName: "",
Labels: map[string]string{
testingQueueLabel: "q1",
},
},
ToLocalQueue: "lq1",
},
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
Expand All @@ -126,7 +158,7 @@ func TestCheckNamespace(t *testing.T) {
client := builder.Build()
ctx := context.Background()

mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping, nil)
mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, tc.mapping, nil)
gotErr := Check(ctx, client, mpc, 8)

if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
Expand Down
25 changes: 12 additions & 13 deletions cmd/importer/pod/import.go
Expand Up @@ -43,34 +43,34 @@ import (
func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go func() {
err := util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
err := util.PushPods(ctx, c, cache.Namespaces, ch)
if err != nil {
ctrl.LoggerFrom(ctx).Error(err, "Listing pods")
}
}()
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) (bool, error) {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Importing")

lq, err := cache.LocalQueue(p)
if err != nil {
return err
lq, skip, err := cache.LocalQueue(p)
if skip || err != nil {
return skip, err
}

oldLq, found := p.Labels[controllerconstants.QueueLabel]
if !found {
if err := addLabels(ctx, c, p, lq.Name, cache.AddLabels); err != nil {
return fmt.Errorf("cannot add queue label: %w", err)
return false, fmt.Errorf("cannot add queue label: %w", err)
}
} else if oldLq != lq.Name {
return fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name)
return false, fmt.Errorf("another local queue name is set %q expecting %q", oldLq, lq.Name)
}

kp := pod.FromObject(p)
// Note: the recorder is not used for single pods, we can just pass nil for now.
wl, err := kp.ConstructComposableWorkload(ctx, c, nil)
if err != nil {
return fmt.Errorf("construct workload: %w", err)
return false, fmt.Errorf("construct workload: %w", err)
}

maps.Copy(wl.Labels, cache.AddLabels)
Expand All @@ -82,12 +82,11 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
}

if err := createWorkload(ctx, c, wl); err != nil {
return fmt.Errorf("creating workload: %w", err)
return false, fmt.Errorf("creating workload: %w", err)

}

//make its admission and update its status

info := workload.NewInfo(wl)
cq := cache.ClusterQueues[string(lq.Spec.ClusterQueue)]
admission := kueue.Admission{
Expand Down Expand Up @@ -122,14 +121,14 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
}
apimeta.SetStatusCondition(&wl.Status.Conditions, admittedCond)
if err := admitWorkload(ctx, c, wl); err != nil {
return err
return false, err
}
log.V(2).Info("Successfully imported", "pod", klog.KObj(p), "workload", klog.KObj(wl))
return nil
return false, nil
})

log := ctrl.LoggerFrom(ctx)
log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods)
log.Info("Import done", "checked", summary.TotalPods, "skipped", summary.SkippedPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
log.Info("Import failed for Pods", "err", e, "occurrences", len(pods), "obsevedFirstIn", pods[0])
}
Expand Down

0 comments on commit 1fecb12

Please sign in to comment.