Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 13, 2024
1 parent e7c479a commit bf584f0
Show file tree
Hide file tree
Showing 4 changed files with 445 additions and 0 deletions.
73 changes: 73 additions & 0 deletions cmd/importer/check/check.go
@@ -0,0 +1,73 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package check

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/cmd/importer/util"
)

var (
ErrNoMappingKey = errors.New("no mapping key found")
ErrNoMapping = errors.New("no mapping found")
ErrLQNotFound = errors.New("localqueue not found")
ErrCQNotFound = errors.New("clusterqueue not found")
)

func Check(ctx context.Context, c client.Client, mappingCache *util.MappingCache) error {
ch := make(chan corev1.Pod)
go util.PushPods(ctx, c, mappingCache.Namespaces, mappingCache.QueueLabel, ch)
errs := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")
mappingKey, found := p.Labels[mappingCache.QueueLabel]
if !found {
return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMappingKey)
}

queueName, found := mappingCache.Mapping[mappingKey]
if !found {
return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMapping)
}

nqQueues, found := mappingCache.LocalQueues[p.Namespace]
if !found {
return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound)
}

lq, found := nqQueues[queueName]
if !found {
return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound)
}

_, found = mappingCache.ClusterQueues[string(lq.Spec.ClusterQueue)]
if !found {
return fmt.Errorf("%s/%s cluster queue: %s: %w", p.Namespace, p.Name, queueName, ErrCQNotFound)
}

return nil
})
return errors.Join(errs...)
}
118 changes: 118 additions & 0 deletions cmd/importer/check/check_test.go
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package check

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/util"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
)

const (
testingNamespace = "ns"
testingQueueLabel = "testing.lbl"
)

func TestCheckNamespace(t *testing.T) {
basePodWrapper := testingpod.MakePod("pod", testingNamespace).
Label(testingQueueLabel, "q1")

baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1")
baseClusterQueue := utiltesting.MakeClusterQueue("cq1")

cases := map[string]struct {
pods []corev1.Pod
clusteQueues []kueue.ClusterQueue
localQueues []kueue.LocalQueue
mapping map[string]string

wantError error
}{
"empty cluster": {},
"no mapping": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
wantError: ErrNoMapping,
},
"no local queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
wantError: ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
wantError: ErrCQNotFound,
},
"all found": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
clusteQueues: []kueue.ClusterQueue{
*baseClusterQueue.Obj(),
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
podsList := corev1.PodList{Items: tc.pods}
cqList := kueue.ClusterQueueList{Items: tc.clusteQueues}
lqList := kueue.LocalQueueList{Items: tc.localQueues}

builder := utiltesting.NewClientBuilder()
builder = builder.WithLists(&podsList, &cqList, &lqList)

client := builder.Build()
ctx := context.Background()

mpc, _ := util.LoadMappingCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping)
gotErr := Check(ctx, client, mpc)

if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Unexpected error (-want/+got)\n%s", diff)
}
})
}

}
118 changes: 118 additions & 0 deletions cmd/importer/main.go
@@ -0,0 +1,118 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"sigs.k8s.io/kueue/cmd/importer/check"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)

const (
NamespaceFlag = "namespace"
NamespaceFlagShort = "n"
QueueMappingFlag = "queuemapping"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "bust"
)

var (
k8sClient client.Client
rootCmd = &cobra.Command{
Use: "importer",
Short: "Import existing (running) objects into Kueue",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
kubeConfig, err := ctrl.GetConfig()
if err != nil {
return err
}
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
}
qps, err := cmd.Flags().GetFloat32(QPSFlag)
if err != nil {
return err
}
kubeConfig.QPS = qps
bust, err := cmd.Flags().GetInt(BurstFlag)
if err != nil {
return err
}
kubeConfig.Burst = bust

c, err := client.New(kubeConfig, client.Options{})
if err != nil {
return err
}
k8sClient = c
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
cmd.Flags().Visit(func(f *pflag.Flag) {
fmt.Println(f.Name, f.Value.String())
})
err := checkCmd(cmd, args)
if err != nil {
return err
}
return nil
},
}
)

func init() {
rootCmd.PersistentFlags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "the running namespace")
rootCmd.MarkPersistentFlagRequired(NamespaceFlag)
rootCmd.PersistentFlags().StringToString(QueueMappingFlag, nil, "queuemapping ex. from=to")
rootCmd.PersistentFlags().String(QueueLabelFlag, "", "label used for queue source")
rootCmd.MarkPersistentFlagRequired(QueueLabelFlag)

rootCmd.PersistentFlags().Float32(QPSFlag, 5, "")
rootCmd.PersistentFlags().Int(BurstFlag, 10, "")

}

func main() {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func checkCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("check")
ctx := ctrl.LoggerInto(context.Background(), log)
flags := cmd.Flags()
namespaces, _ := flags.GetStringSlice(NamespaceFlag)
queueLabel, _ := flags.GetString(QueueLabelFlag)
mapping, _ := flags.GetStringToString(QueueMappingFlag)
mappingCache, _ := util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping)

return check.Check(ctx, k8sClient, mappingCache)
}

0 comments on commit bf584f0

Please sign in to comment.