Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 15, 2024
1 parent bf584f0 commit f545a9f
Show file tree
Hide file tree
Showing 9 changed files with 540 additions and 73 deletions.
68 changes: 58 additions & 10 deletions cmd/importer/main.go
Expand Up @@ -23,11 +23,13 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"sigs.k8s.io/kueue/cmd/importer/check"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/cmd/importer/pod"
"sigs.k8s.io/kueue/cmd/importer/util"
"sigs.k8s.io/kueue/pkg/util/useragent"
)
Expand All @@ -38,7 +40,7 @@ const (
QueueMappingFlag = "queuemapping"
QueueLabelFlag = "queuelabel"
QPSFlag = "qps"
BurstFlag = "bust"
BurstFlag = "burst"
)

var (
Expand All @@ -65,7 +67,11 @@ var (
}
kubeConfig.Burst = bust

c, err := client.New(kubeConfig, client.Options{})
if err := kueue.AddToScheme(scheme.Scheme); err != nil {
return err
}

c, err := client.New(kubeConfig, client.Options{Scheme: scheme.Scheme})
if err != nil {
return err
}
Expand Down Expand Up @@ -95,24 +101,66 @@ func init() {
rootCmd.PersistentFlags().Float32(QPSFlag, 5, "")
rootCmd.PersistentFlags().Int(BurstFlag, 10, "")

rootCmd.AddCommand(
&cobra.Command{
Use: "check",
RunE: checkCmd,
},
&cobra.Command{
Use: "import",
RunE: importCmd,
},
)
}

func main() {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
ctrl.SetLogger(zap.New(zap.UseDevMode(true), zap.ConsoleEncoder()))
err := rootCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func loadMappingCache(ctx context.Context, cmd *cobra.Command) (*util.MappingCache, error) {
flags := cmd.Flags()
namespaces, err := flags.GetStringSlice(NamespaceFlag)
if err != nil {
return nil, err
}
queueLabel, err := flags.GetString(QueueLabelFlag)
if err != nil {
return nil, err
}
mapping, err := flags.GetStringToString(QueueMappingFlag)
if err != nil {
return nil, err
}
return util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping)
}

func checkCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("check")
ctx := ctrl.LoggerInto(context.Background(), log)
flags := cmd.Flags()
namespaces, _ := flags.GetStringSlice(NamespaceFlag)
queueLabel, _ := flags.GetString(QueueLabelFlag)
mapping, _ := flags.GetStringToString(QueueMappingFlag)
mappingCache, _ := util.LoadMappingCache(ctx, k8sClient, namespaces, queueLabel, mapping)
mappingCache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

return pod.Check(ctx, k8sClient, mappingCache)
}

func importCmd(cmd *cobra.Command, _ []string) error {
log := ctrl.Log.WithName("import")
ctx := ctrl.LoggerInto(context.Background(), log)

mappingCache, err := loadMappingCache(ctx, cmd)
if err != nil {
return err
}

if err = pod.Check(ctx, k8sClient, mappingCache); err != nil {
return err
}

return check.Check(ctx, k8sClient, mappingCache)
return pod.Import(ctx, k8sClient, mappingCache)
}
51 changes: 20 additions & 31 deletions cmd/importer/check/check.go → cmd/importer/pod/check.go
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package check
package pod

import (
"context"
Expand All @@ -29,45 +29,34 @@ import (
"sigs.k8s.io/kueue/cmd/importer/util"
)

var (
ErrNoMappingKey = errors.New("no mapping key found")
ErrNoMapping = errors.New("no mapping found")
ErrLQNotFound = errors.New("localqueue not found")
ErrCQNotFound = errors.New("clusterqueue not found")
)

func Check(ctx context.Context, c client.Client, mappingCache *util.MappingCache) error {
ch := make(chan corev1.Pod)
go util.PushPods(ctx, c, mappingCache.Namespaces, mappingCache.QueueLabel, ch)
errs := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error {
summary := util.ConcurrentProcessPod(ch, func(p *corev1.Pod) error {
log := ctrl.LoggerFrom(ctx).WithValues("pod", klog.KObj(p))
log.V(3).Info("Checking")
mappingKey, found := p.Labels[mappingCache.QueueLabel]
if !found {
return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMappingKey)
}

queueName, found := mappingCache.Mapping[mappingKey]
if !found {
return fmt.Errorf("%s/%s: %w", p.Namespace, p.Name, ErrNoMapping)
}

nqQueues, found := mappingCache.LocalQueues[p.Namespace]
if !found {
return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound)
}

lq, found := nqQueues[queueName]
if !found {
return fmt.Errorf("%s/%s queue: %s: %w", p.Namespace, p.Name, queueName, ErrLQNotFound)
}

_, found = mappingCache.ClusterQueues[string(lq.Spec.ClusterQueue)]
if !found {
return fmt.Errorf("%s/%s cluster queue: %s: %w", p.Namespace, p.Name, queueName, ErrCQNotFound)
_, err := mappingCache.ClusterQueue(p)
if err != nil {
return err
}
// do some additional checks like:
// - the preemption policies set in the cq's
// - (maybe) the resources managed by the queues
// - should have one and only one flavor
// - the flavor exists

return nil
})

fmt.Printf("Checked %d pods\n", summary.TotalPods)
fmt.Printf("Failed %d pods\n", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0])
}
var errs []error
for _, e := range summary.Errors {
errs = append(errs, e)
}
return errors.Join(errs...)
}
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package check
package pod

import (
"context"
Expand Down Expand Up @@ -55,7 +55,7 @@ func TestCheckNamespace(t *testing.T) {
pods: []corev1.Pod{
*basePodWrapper.Clone().Obj(),
},
wantError: ErrNoMapping,
wantError: util.ErrNoMapping,
},
"no local queue": {
pods: []corev1.Pod{
Expand All @@ -64,7 +64,7 @@ func TestCheckNamespace(t *testing.T) {
mapping: map[string]string{
"q1": "lq1",
},
wantError: ErrLQNotFound,
wantError: util.ErrLQNotFound,
},
"no cluster queue": {
pods: []corev1.Pod{
Expand All @@ -76,7 +76,7 @@ func TestCheckNamespace(t *testing.T) {
localQueues: []kueue.LocalQueue{
*baseLocalQueue.Obj(),
},
wantError: ErrCQNotFound,
wantError: util.ErrCQNotFound,
},
"all found": {
pods: []corev1.Pod{
Expand Down Expand Up @@ -114,5 +114,4 @@ func TestCheckNamespace(t *testing.T) {
}
})
}

}

0 comments on commit f545a9f

Please sign in to comment.