Skip to content

Commit

Permalink
[cmd/importer] Initial implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 18, 2024
1 parent c49cfd6 commit 00aa28a
Show file tree
Hide file tree
Showing 6 changed files with 939 additions and 0 deletions.
179 changes: 179 additions & 0 deletions cmd/importer/main.go
@@ -0,0 +1,179 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/pod"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)

const (
NamespaceFlag = "namespace"
NamespaceFlagShort = "n"
QueueMappingFlag = "queuemapping"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "burst"
VerbosityFlag = "verbose"
VerboseFlagShort = "v"
)

var (
k8sClient client.Client
rootCmd = &cobra.Command{
Use: "importer",
Short: "Import existing (running) objects into Kueue",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
v, _ := cmd.Flags().GetCount(VerbosityFlag)
level := (v + 1) * -1
ctrl.SetLogger(zap.New(
zap.UseDevMode(true),
zap.ConsoleEncoder(),
zap.Level(zapcore.Level(level)),
))

kubeConfig, err := ctrl.GetConfig()
if err != nil {
return err
}
if kubeConfig.UserAgent == "" {
kubeConfig.UserAgent = useragent.Default()
}
qps, err := cmd.Flags().GetFloat32(QPSFlag)
if err != nil {
return err
}
kubeConfig.QPS = qps
bust, err := cmd.Flags().GetInt(BurstFlag)
if err != nil {
return err
}
kubeConfig.Burst = bust

if err := kueue.AddToScheme(scheme.Scheme); err != nil {
return err
}

c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return err
}
k8sClient = c
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
cmd.Flags().Visit(func(f *pflag.Flag) {
fmt.Println(f.Name, f.Value.String())
})
err := checkCmd(cmd, args)
if err != nil {
return err
}
return nil
},
}
)

func init() {
rootCmd.PersistentFlags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)")
rootCmd.MarkPersistentFlagRequired(NamespaceFlag)

rootCmd.PersistentFlags().String(QueueLabelFlag, "", "label used to identify the target local queue")
rootCmd.PersistentFlags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names")
rootCmd.MarkPersistentFlagRequired(QueueLabelFlag)

rootCmd.PersistentFlags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
rootCmd.PersistentFlags().Int(BurstFlag, 50, "client Burst, https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")

rootCmd.PersistentFlags().CountP(VerbosityFlag, VerboseFlagShort, "verbosity (specify multiple times to increase the log level)")

rootCmd.AddCommand(
&cobra.Command{
Use: "check",
RunE: checkCmd,
},
&cobra.Command{
Use: "import",
RunE: importCmd,
},
)
}

func main() {
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func loadMappingCache(ctx context.Context, cmd *cobra.Command) (*util.MappingCache, error) {
flags := cmd.Flags()
namespaces, err := flags.GetStringSlice(NamespaceFlag)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}
mapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, err
}
return util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping)
}

func checkCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("check")
ctx := ctrl.LoggerInto(context.Background(), log)
mappingCache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

return pod.Check(ctx, k8sClient, mappingCache)
}

func importCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("import")
ctx := ctrl.LoggerInto(context.Background(), log)

mappingCache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

if err = pod.Check(ctx, k8sClient, mappingCache); err != nil {
return err
}

return pod.Import(ctx, k8sClient, mappingCache)
}
62 changes: 62 additions & 0 deletions cmd/importer/pod/check.go
@@ -0,0 +1,62 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"errors"
"fmt"

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/util"
)

func Check(ctx context.Context, c client.Client, mappingCache *util.MappingCache) error {
ch := make(chan corev1.Pod)
go util.PushPods(ctx, c, mappingCache.Namespaces, mappingCache.QueueLabel, ch)
summary := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")

cq, err := mappingCache.ClusterQueue(p)
if err != nil {
return err
}

if !apimeta.IsStatusConditionTrue(cq.Status.Conditions, kueue.ClusterQueueActive) {
return fmt.Errorf("%q: %w", cq.Name, util.ErrCQNotFound)
}

// do some additional checks like:
// - (maybe) the resources managed by the queues

return nil
})

fmt.Printf("Checked %d pods\n", summary.TotalPods)
fmt.Printf("Failed %d pods\n", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0])
}
return errors.Join(summary.Errors...)
}
117 changes: 117 additions & 0 deletions cmd/importer/pod/check_test.go
@@ -0,0 +1,117 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pod

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
corev1 "k8s.io/api/core/v1"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/util"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
)

const (
testingNamespace = "ns"
testingQueueLabel = "testing.lbl"
)

func TestCheckNamespace(t *testing.T) {
basePodWrapper := testingpod.MakePod("pod", testingNamespace).
Label(testingQueueLabel, "q1")

baseLocalQueue := utiltesting.MakeLocalQueue("lq1", testingNamespace).ClusterQueue("cq1")
baseClusterQueue := utiltesting.MakeClusterQueue("cq1")

cases := map[string]struct {
pods []corev1.Pod
clusteQueues []kueue.ClusterQueue
localQueues []kueue.LocalQueue
mapping map[string]string

wantError error
}{
"empty cluster": {},
"no mapping": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
wantError: util.ErrNoMapping,
},
"no local queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
wantError: util.ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
wantError: util.ErrCQNotFound,
},
"all found": {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
mapping: map[string]string{
"q1": "lq1",
},
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
clusteQueues: []kueue.ClusterQueue{
*baseClusterQueue.Obj(),
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
podsList := corev1.PodList{Items: tc.pods}
cqList := kueue.ClusterQueueList{Items: tc.clusteQueues}
lqList := kueue.LocalQueueList{Items: tc.localQueues}

builder := utiltesting.NewClientBuilder()
builder = builder.WithLists(&podsList, &cqList, &lqList)

client := builder.Build()
ctx := context.Background()

mpc, _ := util.LoadMappingCache(ctx, client, []string{testingNamespace}, testingQueueLabel, tc.mapping)
gotErr := Check(ctx, client, mpc)

if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" {
t.Errorf("Unexpected error (-want/+got)\n%s", diff)
}
})
}
}

0 comments on commit 00aa28a

Please sign in to comment.