Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pod importer #1825

Merged
merged 8 commits into from Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/importer/README.md
Expand Up @@ -6,6 +6,8 @@ A tool able to import existing pods into kueue.

The importer should run in a cluster having the Kueue CRDs defined and in which the `kueue-controller-manager` is not running or has the `pod` integration disabled.
trasc marked this conversation as resolved.
Show resolved Hide resolved
trasc marked this conversation as resolved.
Show resolved Hide resolved

For an import to succeed, all the involved Kueue objects (LocalQueues, ClusterQueues and ResourceFlavors) need to be created in the cluster, the check stage of the importer will check this and enumerate the missing objects.

## Build

From kueue source root run:
Expand Down Expand Up @@ -37,4 +39,4 @@ After which, if `--dry-run=false` was specified, for each selected Pod the impor
```bash
./bin/importer import -n ns1,ns2 --queuelabel=src.lbl --queuemapping=src-val=user-queue,src-val=user-queue --dry-run=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why queuemapping has two identical parameters with src-val label, which was not specified in --queuelabel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why queuemapping has two identical parameters with src-val label

fixed

```
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set.
Will import all the pods in namespace `ns1` or `ns2` having the label `src.lbl` set.
2 changes: 1 addition & 1 deletion cmd/importer/main.go
Expand Up @@ -75,7 +75,7 @@ func setFlags(cmd *cobra.Command) {
cmd.Flags().StringSliceP(NamespaceFlag, NamespaceFlagShort, nil, "target namespaces (at least one should be provided)")
cmd.Flags().String(QueueLabelFlag, "", "label used to identify the target local queue")
cmd.Flags().StringToString(QueueMappingFlag, nil, "mapping from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods")
cmd.Flags().StringToString(AddLabelsFlag, nil, "additional label=value pairs to be added to the imported pods and created workloads")
cmd.Flags().String(QueueMappingFileFlag, "", "yaml file containing extra mappings from \""+QueueLabelFlag+"\" label values to local queue names")
cmd.Flags().Float32(QPSFlag, 50, "client QPS, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
cmd.Flags().Int(BurstFlag, 50, "client Burst, as described in https://kubernetes.io/docs/reference/config-api/apiserver-eventratelimit.v1alpha1/#eventratelimit-admission-k8s-io-v1alpha1-Limit")
Expand Down
4 changes: 2 additions & 2 deletions cmd/importer/pod/check.go
Expand Up @@ -65,8 +65,8 @@ func Check(ctx context.Context, c client.Client, cache *util.ImportCache, jobs u
return nil
})

fmt.Printf("Checked %d pods\n", summary.TotalPods)
fmt.Printf("Failed %d pods\n", summary.FailedPods)
log := ctrl.LoggerFrom(ctx)
log.Info("Check done", "checked", summary.TotalPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t Observed first for pod %q\n", len(pods), e, pods[0])
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
35 changes: 17 additions & 18 deletions cmd/importer/pod/import.go
Expand Up @@ -73,6 +73,8 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
return fmt.Errorf("construct workload: %w", err)
}

maps.Copy(wl.Labels, cache.AddLabels)

if pc, found := cache.PriorityClasses[p.Spec.PriorityClassName]; found {
wl.Spec.PriorityClassName = pc.Name
wl.Spec.Priority = &pc.Value
Expand Down Expand Up @@ -122,8 +124,8 @@ func Import(ctx context.Context, c client.Client, cache *util.ImportCache, jobs
return admitWorkload(ctx, c, wl)
})

fmt.Printf("Checked %d pods\n", summary.TotalPods)
fmt.Printf("Failed %d pods\n", summary.FailedPods)
log := ctrl.LoggerFrom(ctx)
log.Info("Import done", "checked", summary.TotalPods, "failed", summary.FailedPods)
for e, pods := range summary.ErrorsForPods {
fmt.Printf("%dx: %s\n\t%v", len(pods), e, pods)
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -148,28 +150,28 @@ func addLabels(ctx context.Context, c client.Client, p *corev1.Pod, queue string
maps.Copy(p.Labels, addLabels)

err := c.Update(ctx, p)
retry, reload, timeouut := checkError(err)
retry, reload, timeout := checkError(err)

for retry {
if timeouut >= 0 {
if timeout >= 0 {
select {
case <-ctx.Done():
return errors.New("context canceled")
case <-time.After(timeouut):
case <-time.After(timeout):
}
}
if reload {
err = c.Get(ctx, client.ObjectKeyFromObject(p), p)
if err != nil {
retry, reload, timeouut = checkError(err)
retry, reload, timeout = checkError(err)
continue
}
p.Labels[controllerconstants.QueueLabel] = queue
p.Labels[pod.ManagedLabelKey] = pod.ManagedLabelValue
maps.Copy(p.Labels, addLabels)
}
err = c.Update(ctx, p)
retry, reload, timeouut = checkError(err)
retry, reload, timeout = checkError(err)
}
return err
}
Expand All @@ -179,37 +181,34 @@ func createWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) er
if apierrors.IsAlreadyExists(err) {
return nil
}
retry, _, timeouut := checkError(err)
retry, _, timeout := checkError(err)
for retry {
if timeouut >= 0 {
if timeout >= 0 {
select {
case <-ctx.Done():
return errors.New("context canceled")
case <-time.After(timeouut):
case <-time.After(timeout):
}
}
err = c.Create(ctx, wl)
retry, _, timeouut = checkError(err)
retry, _, timeout = checkError(err)
}
return err
}

func admitWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) error {
err := workload.ApplyAdmissionStatus(ctx, c, wl, false)
if apierrors.IsAlreadyExists(err) {
return nil
}
retry, _, timeouut := checkError(err)
retry, _, timeout := checkError(err)
for retry {
if timeouut >= 0 {
if timeout >= 0 {
select {
case <-ctx.Done():
return errors.New("context canceled")
case <-time.After(timeouut):
case <-time.After(timeout):
}
}
err = workload.ApplyAdmissionStatus(ctx, c, wl, false)
retry, _, timeouut = checkError(err)
retry, _, timeout = checkError(err)
}
return err
}
4 changes: 3 additions & 1 deletion cmd/importer/pod/import_test.go
Expand Up @@ -145,7 +145,9 @@ func TestImportNamespace(t *testing.T) {
},

wantWorkloads: []kueue.Workload{
*baseWlWrapper.Clone().Obj(),
*baseWlWrapper.Clone().
Label("new.lbl", "val").
Obj(),
},
},
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/importer/util/util.go
Expand Up @@ -26,6 +26,7 @@ import (

corev1 "k8s.io/api/core/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -45,7 +46,7 @@ var (
ErrCQInvalid = errors.New("clusterqueue invalid")
)

func ListOptions(namespace, queueLabel, continueToken string) []client.ListOption {
func listOptions(namespace, queueLabel, continueToken string) []client.ListOption {
opts := []client.ListOption{
client.InNamespace(namespace),
client.HasLabels{queueLabel},
Expand Down Expand Up @@ -152,15 +153,18 @@ func PushPods(ctx context.Context, c client.Client, namespaces []string, queueLa
defer log.V(3).Info("End pods list")
page := 0
for {
err := c.List(ctx, lst, ListOptions(ns, queueLabel, lst.Continue)...)
err := c.List(ctx, lst, listOptions(ns, queueLabel, lst.Continue)...)
if err != nil {
log.Error(err, "list")
return fmt.Errorf("listing pods in %s, page %d: %w", ns, page, err)
}

for _, p := range lst.Items {
// ignore deleted or in final Phase?
ch <- p
if p.Status.Phase == corev1.PodFailed || p.Status.Phase == corev1.PodSucceeded {
log.V(2).Info("Skip pod", "pod", klog.KObj(&p), "phase", p.Status.Phase)
} else {
ch <- p
}
}

page++
Expand Down
10 changes: 10 additions & 0 deletions test/integration/importer/importer_test.go
Expand Up @@ -109,6 +109,9 @@ var _ = ginkgo.Describe("Importer", func() {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl2)
})

wl1UID := wl1.UID
wl2UID := wl2.UID

ginkgo.By("Starting kueue, the cluster queue status should account for the imported Workloads", func() {
fwk.StartManager(ctx, cfg, managerAndSchedulerSetup)

Expand Down Expand Up @@ -149,6 +152,13 @@ var _ = ginkgo.Describe("Importer", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, wl2LookupKey)
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wl1, wl3)
})

ginkgo.By("Checking the imported Workloads are not recreated", func() {
gomega.Expect(k8sClient.Get(ctx, wl1LookupKey, wl1)).To(gomega.Succeed())
gomega.Expect(wl1.UID).To(gomega.Equal(wl1UID))
gomega.Expect(k8sClient.Get(ctx, wl2LookupKey, wl2)).To(gomega.Succeed())
gomega.Expect(wl2.UID).To(gomega.Equal(wl2UID))
})
})
})

Expand Down