Skip to content

Commit

Permalink
[cmd/importer] Initial implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 19, 2024
1 parent d32fe49 commit c86a3de
Show file tree
Hide file tree
Showing 10 changed files with 1,249 additions and 5 deletions.
183 changes: 183 additions & 0 deletions cmd/importer/main.go
@@ -0,0 +1,183 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/pod"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)

const (
NamespaceFlag = "namespace"
NamespaceFlagShort = "n"
QueueMappingFlag = "queuemapping"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "burst"
VerbosityFlag = "verbose"
VerboseFlagShort = "v"
JobsFlag = "jobs"
JobsFlagShort = "j"
)

var (
k8sClient client.Client
rootCmd = &cobra.Command{
Use: "importer",
Short: "Import existing (running) objects into Kueue",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
v, _ := cmd.Flags().GetCount(VerbosityFlag)
level := (v + 1) * -1
ctrl.SetLogger(zap.New(
zap.UseDevMode(true),
zap.ConsoleEncoder(),
zap.Level(zapcore.Level(level)),
))

kubeConfig, err := ctrl.GetConfig()
if err != nil {
return err
}
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
}
qps, err := cmd.Flags().GetFloat32(QPSFlag)
if err != nil {
return err
}
kubeConfig.QPS = qps
bust, err := cmd.Flags().GetInt(BurstFlag)
if err != nil {
return err
}
kubeConfig.Burst = bust

if err := kueue.AddToScheme(scheme.Scheme); err != nil {
return err
}

c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return err
}
k8sClient = c
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
cmd.Flags().Visit(func(f *pflag.Flag) {
fmt.Println(f.Name, f.Value.String())
})
err := checkCmd(cmd, args)
if err != nil {
return err
}
return nil
},
}
)

func init() {
rootCmd.PersistentFlags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)")
rootCmd.MarkPersistentFlagRequired(NamespaceFlag)

rootCmd.PersistentFlags().String(QueueLabelFlag, "", "label used to identify the target local queue")
rootCmd.PersistentFlags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names")
rootCmd.MarkPersistentFlagRequired(QueueLabelFlag)

rootCmd.PersistentFlags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
rootCmd.PersistentFlags().Int(BurstFlag, 50, "client Burst, https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
rootCmd.PersistentFlags().UintP(JobsFlag, JobsFlagShort, 8, "number of concurrent jobs")

rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)")

rootCmd.AddCommand(
&cobra.Command{
Use: "check",
RunE: checkCmd,
},
&cobra.Command{
Use: "import",
RunE: importCmd,
},
)
}

func main() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func loadMappingCache(ctx context.Context, cmd *cobra.Command) (*util.ImportCache, error) {
flags := cmd.Flags()
namespaces, err := flags.GetStringSlice(NamespaceFlag)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}
mapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, err
}
return util.LoadImportCache(ctx, k8sClient, namespaces, queueLabel, mapping)
}

func checkCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("check")
ctx := ctrl.LoggerInto(context.Background(), log)
jobs, _ := cmd.Flags().GetUint(JobsFlag)
cache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

return pod.Check(ctx, k8sClient, cache, jobs)
}

func importCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("import")
ctx := ctrl.LoggerInto(context.Background(), log)
jobs, _ := cmd.Flags().GetUint(JobsFlag)
cache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

if err = pod.Check(ctx, k8sClient, cache, jobs); err != nil {
return err
}

return pod.Import(ctx, k8sClient, cache, jobs)
}
69 changes: 69 additions & 0 deletions cmd/importer/pod/check.go
@@ -0,0 +1,69 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/kueue/cmd/importer/util"
)

func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs uint) error {
ch := make(chan corev1.Pod)
go util.PushPods(ctx, c, cache.Namespaces, cache.QueueLabel, ch)
summary := util.ConcurrentProcessPod(ch, jobs, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := cache.ClusterQueue(p)
if err != nil {
return err
}

if len(cq.Spec.ResourceGroups) == 0 {
return fmt.Errorf("%q has no resource groups: %w", cq.Name, util.ErrCQInvalid)
}

if len(cq.Spec.ResourceGroups[0].Flavors) == 0 {
return fmt.Errorf("%q has no resource groups flavors: %w", cq.Name, util.ErrCQInvalid)
}

rf := string(cq.Spec.ResourceGroups[0].Flavors[0].Name)
if _, found := cache.ResourceFalvors[rf]; !found {
return fmt.Errorf("%q flavor %q: %w", cq.Name, rf, util.ErrCQInvalid)
}

// do some additional checks like:
// - (maybe) the resources managed by the queues

return nil
})

fmt.Printf("Checked %d pods\n", summary.TotalPods)
fmt.Printf("Failed %d pods\n", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0])
}
return errors.Join(summary.Errors...)
}
137 changes: 137 additions & 0 deletions cmd/importer/pod/check_test.go
@@ -0,0 +1,137 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/util"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
)

const (
testingNamespace = "ns"
testingQueueLabel = "testing.lbl"
)

func TestCheckNamespace(t *testing.T) {
basePodWrapper := testingpod.MakePod("pod", testingNamespace).
Label(testingQueueLabel, "q1")

baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1")
baseClusterQueue := utiltesting.MakeClusterQueue("cq1")

cases := map[string]struct {
pods []corev1.Pod
clusteQueues []kueue.ClusterQueue
localQueues []kueue.LocalQueue
mapping map[string]string
flavors []kueue.ResourceFlavor

wantError error
}{
"empty cluster": {},
"no mapping": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
wantError: util.ErrNoMapping,
},
"no local queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
wantError: util.ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
wantError: util.ErrCQNotFound,
},
"invalid cq": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
clusteQueues: []kueue.ClusterQueue{
*baseClusterQueue.Obj(),
},
wantError: util.ErrCQInvalid,
},
"all found": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
clusteQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("cq1").ResourceGroup(*utiltesting.MakeFlavorQuotas("rf1").Resource(corev1.ResourceCPU, "1").Obj()).Obj(),
},
flavors: []kueue.ResourceFlavor{
*utiltesting.MakeResourceFlavor("rf1").Obj(),
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
podsList := corev1.PodList{Items: tc.pods}
cqList := kueue.ClusterQueueList{Items: tc.clusteQueues}
lqList := kueue.LocalQueueList{Items: tc.localQueues}
rfList := kueue.ResourceFlavorList{Items: tc.flavors}

builder := utiltesting.NewClientBuilder()
builder = builder.WithLists(&podsList, &cqList, &lqList, &rfList)

client := builder.Build()
ctx := context.Background()

mpc, _ := util.LoadImportCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping)
gotErr := Check(ctx, client, mpc, 8)

if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Unexpected error (-want/+got)\n%s", diff)
}
})
}
}

0 comments on commit c86a3de

Please sign in to comment.