Skip to content

Commit

Permalink
add e2e test for single cluster jobset
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Mar 25, 2024
1 parent a8cd0fc commit 5e1a8ba
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 14 deletions.
7 changes: 5 additions & 2 deletions Makefile
Expand Up @@ -95,6 +95,9 @@ LD_FLAGS += -X '$(version_pkg).GitCommit=$(shell git rev-parse HEAD)'
RELEASE_VERSION=v0.6.1
RELEASE_BRANCH=main

# JobSet Version
JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset)

.PHONY: all
all: generate fmt vet build

Expand Down Expand Up @@ -188,14 +191,14 @@ MULTIKUEUE-E2E_TARGETS := $(addprefix run-test-multikueue-e2e-,${E2E_K8S_VERSION
.PHONY: test-e2e-all
test-e2e-all: ginkgo $(E2E_TARGETS) $(MULTIKUEUE-E2E_TARGETS)


FORCE:

run-test-e2e-%: K8S_VERSION = $(@:run-test-e2e-%=%)
run-test-e2e-%: FORCE
@echo Running e2e for k8s ${K8S_VERSION}
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" ./hack/e2e-test.sh
E2E_KIND_VERSION="kindest/node:v$(K8S_VERSION)" KIND_CLUSTER_NAME=$(KIND_CLUSTER_NAME) CREATE_KIND_CLUSTER=$(CREATE_KIND_CLUSTER) ARTIFACTS="$(ARTIFACTS)/$@" IMAGE_TAG=$(IMAGE_TAG) GINKGO_ARGS="$(GINKGO_ARGS)" JOBSET_VERSION=$(JOBSET_VERSION) ./hack/e2e-test.sh

JOBSET_VERSION = $(shell $(GO_CMD) list -m -f "{{.Version}}" sigs.k8s.io/jobset)
run-test-multikueue-e2e-%: K8S_VERSION = $(@:run-test-multikueue-e2e-%=%)
run-test-multikueue-e2e-%: FORCE
@echo Running multikueue e2e for k8s ${K8S_VERSION}
Expand Down
10 changes: 10 additions & 0 deletions hack/e2e-common.sh
Expand Up @@ -19,6 +19,9 @@ export GINKGO="$ROOT_DIR"/bin/ginkgo
export KIND="$ROOT_DIR"/bin/kind
export YQ="$ROOT_DIR"/bin/yq

export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml
export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION}
export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/

# $1 - cluster name
function cluster_cleanup {
Expand Down Expand Up @@ -57,6 +60,13 @@ function cluster_kueue_deploy {
kubectl apply --server-side -k test/e2e/config
}

#$1 - cluster name
function install_jobset {
cluster_kind_load_image ${1} ${JOBSET_IMAGE}
kubectl config use-context kind-${1}
kubectl apply --server-side -f ${JOBSET_MANIFEST}
}

export INITIAL_IMAGE=$($YQ '.images[] | select(.name == "controller") | [.newName, .newTag] | join(":")' config/components/manager/kustomization.yaml)

function restore_managers_image {
Expand Down
7 changes: 7 additions & 0 deletions hack/e2e-test.sh
Expand Up @@ -22,6 +22,10 @@ SOURCE_DIR="$(cd "$(dirname -- "${BASH_SOURCE[0]}")" && pwd -P)"
ROOT_DIR="$SOURCE_DIR/.."
export E2E_TEST_IMAGE=gcr.io/k8s-staging-perf-tests/sleep:v0.1.0

export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml
export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION}
export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/

source ${SOURCE_DIR}/e2e-common.sh

function cleanup {
Expand Down Expand Up @@ -52,6 +56,9 @@ function kind_load {
docker pull $E2E_TEST_IMAGE
cluster_kind_load $KIND_CLUSTER_NAME
fi
docker pull registry.k8s.io/jobset/jobset:$JOBSET_VERSION
kubectl apply --server-side -f ${JOBSET_CRDS}/*
install_jobset $KIND_CLUSTER_NAME
}

function kueue_deploy {
Expand Down
12 changes: 0 additions & 12 deletions hack/multikueue-e2e-test.sh
Expand Up @@ -25,10 +25,6 @@ export MANAGER_KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME}-manager
export WORKER1_KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME}-worker1
export WORKER2_KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME}-worker2

export JOBSET_MANIFEST=https://github.com/kubernetes-sigs/jobset/releases/download/${JOBSET_VERSION}/manifests.yaml
export JOBSET_IMAGE=registry.k8s.io/jobset/jobset:${JOBSET_VERSION}
export JOBSET_CRDS=${ROOT_DIR}/dep-crds/jobset-operator/

source ${SOURCE_DIR}/e2e-common.sh

function cleanup {
Expand Down Expand Up @@ -72,14 +68,6 @@ function startup {
fi
}


#$1 - cluster name
function install_jobset {
cluster_kind_load_image ${1} ${JOBSET_IMAGE}
kubectl config use-context kind-${1}
kubectl apply --server-side -f ${JOBSET_MANIFEST}
}

function kind_load {
if [ $CREATE_KIND_CLUSTER == 'true' ]
then
Expand Down
103 changes: 103 additions & 0 deletions test/e2e/singlecluster/e2e_test.go
Expand Up @@ -27,13 +27,16 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)
Expand Down Expand Up @@ -490,6 +493,106 @@ var _ = ginkgo.Describe("Kueue", func() {
})
})
})
ginkgo.When("Creating a JobSet", func() {
var (
defaultRf *kueue.ResourceFlavor
localQueue *kueue.LocalQueue
clusterQueue *kueue.ClusterQueue
)
ginkgo.BeforeEach(func() {
defaultRf = testing.MakeResourceFlavor("default").Obj()
gomega.Expect(k8sClient.Create(ctx, defaultRf)).Should(gomega.Succeed())
clusterQueue = testing.MakeClusterQueue("cluster-queue").
ResourceGroup(
*testing.MakeFlavorQuotas(defaultRf.Name).
Resource(corev1.ResourceCPU, "2").
Resource(corev1.ResourceMemory, "2G").Obj()).Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed())
localQueue = testing.MakeLocalQueue("main", ns.Name).ClusterQueue("cluster-queue").Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
gomega.Expect(util.DeleteAllJobsetsInNamespace(ctx, k8sClient, ns)).Should(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultRf, true)
})

ginkgo.It("Should run a jobSet if admitted", func() {
jobSet := testingjobset.MakeJobSet("job-set", ns.Name).
Queue("main").
ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Replicas: 2,
Parallelism: 2,
Completions: 2,
Image: "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",
// Give it the time to be observed Active in the live status update step.
Args: []string{"5s"},
},
).
Request("replicated-job-1", "cpu", "500m").
Request("replicated-job-1", "memory", "200M").
Obj()

ginkgo.By("Creating the jobSet", func() {
gomega.Expect(k8sClient.Create(ctx, jobSet)).Should(gomega.Succeed())
})

createdLeaderWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), Namespace: ns.Name}

ginkgo.By("Waiting for the jobSet to get status updates", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdJobset := &jobset.JobSet{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(jobSet), createdJobset)).To(gomega.Succeed())

g.Expect(createdJobset.Status.ReplicatedJobsStatus).To(gomega.BeComparableTo([]jobset.ReplicatedJobStatus{
{
Name: "replicated-job-1",
Ready: 2,
Active: 2,
},
}, cmpopts.IgnoreFields(jobset.ReplicatedJobStatus{}, "Succeeded", "Failed")))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the jobSet to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())

g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "JobSetFinished",
Message: "JobSet finished successfully",
}, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the cluster and the jobSet is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, wlLookupKey, workerWl)).To(testing.BeNotFoundError())
workerJobSet := &jobset.JobSet{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(jobSet), workerJobSet)).To(testing.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

createdJobSet := &jobset.JobSet{}
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(jobSet), createdJobSet)).To(gomega.Succeed())
gomega.Expect(ptr.Deref(createdJobSet.Spec.Suspend, true)).To(gomega.BeFalse())
gomega.Expect(createdJobSet.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(
metav1.Condition{
Type: string(jobset.JobSetCompleted),
Status: metav1.ConditionTrue,
Reason: "AllJobsCompleted",
Message: "jobset completed successfully",
},
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"))))
})
})
})
})

func expectJobUnsuspended(key types.NamespacedName) {
Expand Down
9 changes: 9 additions & 0 deletions test/util/util.go
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobs/pod"
Expand Down Expand Up @@ -134,6 +135,14 @@ func DeleteAllJobsInNamespace(ctx context.Context, c client.Client, ns *corev1.N
return nil
}

func DeleteAllJobsetsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
err := c.DeleteAllOf(ctx, &jobset.JobSet{}, client.InNamespace(ns.Name), client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
return nil
}

func DeleteAllPodsInNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) error {
err := c.DeleteAllOf(ctx, &corev1.Pod{}, client.InNamespace(ns.Name))
if err != nil && !apierrors.IsNotFound(err) {
Expand Down

0 comments on commit 5e1a8ba

Please sign in to comment.