Skip to content

Commit

Permalink
e2e_node: DRA: test plugin failures
Browse files Browse the repository at this point in the history
  • Loading branch information
bart0sh committed Apr 29, 2024
1 parent a9eded0 commit 1a840b9
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 14 deletions.
36 changes: 28 additions & 8 deletions test/e2e/dra/test-driver/app/gomega.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,42 @@ var BeRegistered = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error
return false, nil
}).WithMessage("contain successful NotifyRegistrationStatus call")

// NodePrepareResouceCalled checks that NodePrepareResource API has been called
var NodePrepareResourceCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
// NodePrepareResoucesSucceeded checks that NodePrepareResources API has been called and succeeded
var NodePrepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResource") && call.Err == nil {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodePrepareResource call")
}).WithMessage("contain NodePrepareResources call")

// NodePrepareResoucesCalled checks that NodePrepareResources API has been called
var NodePrepareResourcesCalled = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
// NodePrepareResoucesErrored checks that NodePrepareResources API has been called and returned an error
var NodePrepareResourcesErrored = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err == nil {
if strings.HasSuffix(call.FullMethod, "/NodePrepareResources") && call.Err != nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodePrepareResources call")
}).WithMessage("contain NodePrepareResources errored call")

// NodeUnprepareResoucesSucceeded checks that NodeUnprepareResources API has been called and succeeded
var NodeUnprepareResourcesSucceeded = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Err == nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodeUnprepareResources call")

// NodeUnprepareResoucesErrored checks that NodeUnprepareResources API has been called and returned an error
var NodeUnprepareResourcesErrored = gcustom.MakeMatcher(func(actualCalls []GRPCCall) (bool, error) {
for _, call := range actualCalls {
if strings.HasSuffix(call.FullMethod, "/NodeUnprepareResources") && call.Err != nil {
return true, nil
}
}
return false, nil
}).WithMessage("contain NodeUnprepareResources errored call")
13 changes: 12 additions & 1 deletion test/e2e/dra/test-driver/app/kubeletplugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type ExamplePlugin struct {
prepared map[ClaimID]any
gRPCCalls []GRPCCall

block bool
block bool
failure error
}

type GRPCCall struct {
Expand Down Expand Up @@ -168,6 +169,10 @@ func (ex *ExamplePlugin) Block() {
ex.block = true
}

func (ex *ExamplePlugin) SetFailure(err error) {
ex.failure = err
}

// NodePrepareResource ensures that the CDI file for the claim exists. It uses
// a deterministic name to simplify NodeUnprepareResource (no need to remember
// or discover the name) and idempotency (when called again, the file simply
Expand Down Expand Up @@ -309,6 +314,9 @@ func (ex *ExamplePlugin) NodePrepareResources(ctx context.Context, req *drapbv1a
resp := &drapbv1alpha3.NodePrepareResourcesResponse{
Claims: make(map[string]*drapbv1alpha3.NodePrepareResourceResponse),
}
if ex.failure != nil {
return resp, ex.failure
}
for _, claimReq := range req.Claims {
cdiDevices, err := ex.nodePrepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
if err != nil {
Expand Down Expand Up @@ -381,6 +389,9 @@ func (ex *ExamplePlugin) NodeUnprepareResources(ctx context.Context, req *drapbv
resp := &drapbv1alpha3.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapbv1alpha3.NodeUnprepareResourceResponse),
}
if ex.failure != nil {
return resp, ex.failure
}
for _, claimReq := range req.Claims {
err := ex.nodeUnprepareResource(ctx, claimReq.Name, claimReq.Uid, claimReq.ResourceHandle, claimReq.StructuredResourceHandle)
if err != nil {
Expand Down
112 changes: 107 additions & 5 deletions test/e2e_node/dra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ package e2enode

import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -97,7 +99,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
ginkgo.It("must process pod created when kubelet is not running", func(ctx context.Context) {
// Stop Kubelet
startKubelet := stopKubelet()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 0)
// Pod must be in pending state
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
return pod.Status.Phase == v1.PodPending, nil
Expand All @@ -113,7 +115,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
ginkgo.It("must keep pod in pending state if NodePrepareResources times out", func(ctx context.Context) {
ginkgo.By("set delay for the NodePrepareResources call")
kubeletPlugin.Block()
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod")
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 0)

ginkgo.By("wait for pod to be in Pending state")
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
Expand All @@ -122,14 +124,114 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
framework.ExpectNoError(err)

ginkgo.By("wait for NodePrepareResources call")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesCalled)
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)

// TODO: Check condition or event when implemented
// see https://github.com/kubernetes/kubernetes/issues/118468 for details
ginkgo.By("check that pod is consistently in Pending state")
gomega.Consistently(ctx, e2epod.Get(f.ClientSet, pod)).WithTimeout(podInPendingStateTimeout).Should(e2epod.BeInPhase(v1.PodPending),
"Pod should be in Pending state as resource preparation time outed")
})

ginkgo.It("must run pod if NodePrepareResources fails and then succeeds", func(ctx context.Context) {
kubeletPlugin.SetFailure(errors.New("Simulated failure"))
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 0)

ginkgo.By("wait for pod to be in Pending state")
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
return pod.Status.Phase == v1.PodPending, nil
})
framework.ExpectNoError(err)

ginkgo.By("wait for NodePrepareResources call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesErrored)

kubeletPlugin.SetFailure(nil)

ginkgo.By("wait for NodePrepareResources call to succeed")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)

ginkgo.By("wait for pod to succeed")
err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err)
})

ginkgo.It("must run pod if NodeUnprepareResources fails and then succeeds", func(ctx context.Context) {
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 10)

ginkgo.By("wait for pod to run")
err := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
framework.ExpectNoError(err)

kubeletPlugin.SetFailure(errors.New("Simulated failure"))
ginkgo.By("wait for NodeUnprepareResources call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesErrored)

kubeletPlugin.SetFailure(nil)

ginkgo.By("wait for NodeUnprepareResources call to succeed")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)

ginkgo.By("wait for pod to succeed")
err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err)
})

ginkgo.It("must retry NodePrepareResources after Kubelet restart", func(ctx context.Context) {
kubeletPlugin.SetFailure(errors.New("Simulated failure"))
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 0)

ginkgo.By("wait for pod to be in Pending state")
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Pending", framework.PodStartShortTimeout, func(pod *v1.Pod) (bool, error) {
return pod.Status.Phase == v1.PodPending, nil
})
framework.ExpectNoError(err)

ginkgo.By("wait for NodePrepareResources call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesErrored)

ginkgo.By("stop Kubelet")
startKubelet := stopKubelet()

kubeletPlugin.SetFailure(nil)

ginkgo.By("start Kubelet")
startKubelet()

ginkgo.By("wait for NodePrepareResources call to succeed")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodePrepareResourcesSucceeded)

ginkgo.By("wait for pod to succeed")
err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err)
})

ginkgo.It("must retry NodeUnprepareResources after Kubelet restart", func(ctx context.Context) {
pod := createTestObjects(ctx, f.ClientSet, getNodeName(ctx, f), f.Namespace.Name, "draclass", "external-claim", "drapod", 10)

ginkgo.By("wait for pod to run")
err := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
framework.ExpectNoError(err)

kubeletPlugin.SetFailure(errors.New("Simulated failure"))
ginkgo.By("wait for NodeUnprepareResources call to fail")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesErrored)

ginkgo.By("stop Kubelet")
startKubelet := stopKubelet()

kubeletPlugin.SetFailure(nil)

ginkgo.By("start Kubelet")
startKubelet()

ginkgo.By("wait for NodeUnprepareResources call to succeed")
gomega.Eventually(kubeletPlugin.GetGRPCCalls).WithTimeout(dra.PluginClientTimeout * 2).Should(testdriver.NodeUnprepareResourcesSucceeded)

ginkgo.By("wait for pod to succeed")
err = e2epod.WaitForPodSuccessInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name)
framework.ExpectNoError(err)
})
})
})

Expand Down Expand Up @@ -170,7 +272,7 @@ func newKubeletPlugin(ctx context.Context, nodeName string) *testdriver.ExampleP
// NOTE: as scheduler and controller manager are not running by the Node e2e,
// the objects must contain all required data to be processed correctly by the API server
// and placed on the node without involving the scheduler and the DRA controller
func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string) *v1.Pod {
func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, nodename, namespace, className, claimName, podName string, sleepTime int) *v1.Pod {
// ResourceClass
class := &resourcev1alpha2.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -222,7 +324,7 @@ func createTestObjects(ctx context.Context, clientSet kubernetes.Interface, node
Resources: v1.ResourceRequirements{
Claims: []v1.ResourceClaim{{Name: podClaimName}},
},
Command: []string{"/bin/sh", "-c", "env | grep DRA_PARAM1=PARAM1_VALUE"},
Command: []string{"/bin/sh", "-c", fmt.Sprintf("env | grep DRA_PARAM1=PARAM1_VALUE && sleep %d", sleepTime)},
},
},
RestartPolicy: v1.RestartPolicyNever,
Expand Down

0 comments on commit 1a840b9

Please sign in to comment.