From 16b22286e1a18d1a8ef2c92cd792174b074bb597 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 26 Feb 2024 16:11:06 +0100 Subject: [PATCH] dra kubelet: implement NodeResourceSlice controller kubelet is responsible for publishing the NodeResourceSlice objects for its node. The new controller deletes any extra objects that don't match some registered driver, updates existing ones, and creates new ones as needed to ensure that the information provided by drivers is replicated correctly in the cluster. --- cmd/kubelet/app/server.go | 1 + pkg/kubelet/cm/container_manager_linux.go | 4 +- pkg/kubelet/cm/dra/manager.go | 2 +- pkg/kubelet/cm/dra/manager_test.go | 8 +- pkg/kubelet/cm/dra/plugin/client_test.go | 85 ++++ pkg/kubelet/cm/dra/plugin/noderesources.go | 447 ++++++++++++++++++ pkg/kubelet/cm/dra/plugin/plugin.go | 33 +- pkg/kubelet/cm/dra/plugin/plugin_test.go | 14 +- pkg/kubelet/kubelet.go | 2 +- test/e2e/dra/deploy.go | 32 ++ test/e2e/dra/dra.go | 257 +++++----- test/e2e/dra/test-driver/app/kubeletplugin.go | 1 + 12 files changed, 754 insertions(+), 132 deletions(-) create mode 100644 pkg/kubelet/cm/dra/plugin/noderesources.go diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 9a9338e7705e2..f0a8c020990bf 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -842,6 +842,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend s.FailSwapOn, kubeDeps.Recorder, kubeDeps.KubeClient, + nodeName, ) if err != nil { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 4b3d9ffe9dc2d..0b571028402fe 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -198,7 +198,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) { // TODO(vmarmol): Add limits to the system containers. // Takes the absolute name of the specified containers. // Empty container name disables use of the specified container. -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface, nodeName types.NodeName) (ContainerManager, error) { subsystems, err := GetCgroupSubsystems() if err != nil { return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) @@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I // initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") - cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir) + cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeName) if err != nil { return nil, err } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 085366ac0afcf..49cd60cb266d6 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -45,7 +45,7 @@ type ManagerImpl struct { } // NewManagerImpl creates a new manager. -func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) { +func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) { klog.V(2).InfoS("Creating DRA manager") claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 9e5e8c4bd673c..dc0e697d0dd16 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory) + manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker") if test.wantErr { assert.Error(t, err) return @@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, t.TempDir()) + manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker") assert.NoError(t, err) if test.claimInfo != nil { @@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/client_test.go b/pkg/kubelet/cm/dra/plugin/client_test.go index 3e1889e2c9787..339b57b9d5cdf 100644 --- a/pkg/kubelet/cm/dra/plugin/client_test.go +++ b/pkg/kubelet/cm/dra/plugin/client_test.go @@ -43,6 +43,12 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in * return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil } +func (f *fakeV1alpha3GRPCServer) NodeResources(req *drapbv1alpha3.NodeResourcesRequest, srv drapbv1alpha3.Node_NodeResourcesServer) error { + srv.Send(&drapbv1alpha3.NodeResourcesResponse{}) + srv.Send(&drapbv1alpha3.NodeResourcesResponse{}) + return nil +} + type fakeV1alpha2GRPCServer struct { drapbv1alpha2.UnimplementedNodeServer } @@ -288,3 +294,82 @@ func TestNodeUnprepareResource(t *testing.T) { }) } } + +func TestNodeResources(t *testing.T) { + for _, test := range []struct { + description string + serverSetup func(string) (string, tearDown, error) + serverVersion string + request *drapbv1alpha3.NodeResourcesRequest + responses []*drapbv1alpha3.NodeResourcesResponse + expectError string + }{ + { + description: "server supports NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha3Version, + request: &drapbv1alpha3.NodeResourcesRequest{}, + responses: []*drapbv1alpha3.NodeResourcesResponse{ + {}, + {}, + }, + expectError: "EOF", + }, + { + description: "server doesn't support NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha2Version, + request: new(drapbv1alpha3.NodeResourcesRequest), + expectError: "Unimplemented", + }, + } { + t.Run(test.description, func(t *testing.T) { + addr, teardown, err := setupFakeGRPCServer(test.serverVersion) + if err != nil { + t.Fatal(err) + } + defer teardown() + + p := &plugin{ + endpoint: addr, + version: v1alpha3Version, + } + + conn, err := p.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + draPlugins.add("dummy-plugin", p) + defer draPlugins.delete("dummy-plugin") + + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Fatal(err) + } + + stream, err := client.NodeResources(context.Background(), test.request) + if err != nil { + t.Fatal(err) + } + var actualResponses []*drapbv1alpha3.NodeResourcesResponse + var actualErr error + for { + resp, err := stream.Recv() + if err != nil { + actualErr = err + break + } + actualResponses = append(actualResponses, resp) + } + assert.Equal(t, test.responses, actualResponses) + assert.Contains(t, actualErr.Error(), test.expectError) + }) + } +} diff --git a/pkg/kubelet/cm/dra/plugin/noderesources.go b/pkg/kubelet/cm/dra/plugin/noderesources.go new file mode 100644 index 0000000000000..94b27f13fc641 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/noderesources.go @@ -0,0 +1,447 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "k8s.io/api/resource/v1alpha2" + resourceapi "k8s.io/api/resource/v1alpha2" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + resourceinformers "k8s.io/client-go/informers/resource/v1alpha2" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3" +) + +const ( + // resyncPeriod for informer + // TODO: disable? + resyncPeriod = time.Duration(10 * time.Minute) +) + +// nodeResourcesController collects resource information from all registered +// plugins and synchronizes that information with NodeResourceSlice objects. +type nodeResourcesController struct { + ctx context.Context + kubeClient kubernetes.Interface + nodeName string + wg sync.WaitGroup + queue workqueue.RateLimitingInterface + sliceStore cache.Store + + mutex sync.RWMutex + activePlugins map[string]*activePlugin +} + +// activePlugin holds the resource information about one plugin +// and the gRPC stream that is used to retrieve that. The context +// used by that stream can be canceled separately to stop +// the monitoring. +type activePlugin struct { + cancel func(reason error) + + // resources is protected by the nodeResourcesController read/write lock. + // When receiving updates from the driver, the entire slice gets replaced, + // so it is okay to not do a deep copy of it. Only retrieving the slice + // must be protected by a read lock. + resources []*v1alpha2.NodeResourceModel +} + +// startNodeResourcesController constructs a new controller and starts it. +// +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. Without it, +// the controller is inactive. +func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController { + if kubeClient == nil { + return nil + } + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "node resources controller") + ctx = klog.NewContext(ctx, logger) + + c := &nodeResourcesController{ + ctx: ctx, + kubeClient: kubeClient, + nodeName: nodeName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"), + activePlugins: make(map[string]*activePlugin), + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.run(ctx) + }() + + return c +} + +// waitForStop blocks until all background activity spawned by +// the controller has stopped. The context passed to start must +// be canceled for that to happen. +func (c *nodeResourcesController) waitForStop() { + if c == nil { + return + } + + c.wg.Wait() +} + +// addPlugin is called whenever a plugin has been (re-)registered. +func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + + if active := c.activePlugins[driverName]; active != nil { + active.cancel(errors.New("plugin has re-registered")) + } + active := &activePlugin{} + cancelCtx, cancel := context.WithCancelCause(c.ctx) + active.cancel = cancel + c.activePlugins[driverName] = active + c.queue.Add(driverName) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.monitorPlugin(cancelCtx, active, driverName, pluginInstance) + }() +} + +// removePlugin is called whenever a plugin has been unregistered. +func (c *nodeResourcesController) removePlugin(driverName string) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + if active, ok := c.activePlugins[driverName]; ok { + active.cancel(errors.New("plugin has unregistered")) + delete(c.activePlugins, driverName) + c.queue.Add(driverName) + } +} + +func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "driverName", driverName) + logger.Info("Starting to monitor node resources of the plugin") + defer func() { + r := recover() + logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r) + }() + + // Keep trying until canceled. + for ctx.Err() == nil { + logger.V(5).Info("Calling NodeResources") + stream, err := pluginInstance.NodeResources(ctx, new(drapb.NodeResourcesRequest)) + if err != nil { + switch { + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin simply doesn't provide node resources. + active.cancel(errors.New("plugin does not support node resource reporting")) + default: + // This is a problem, report it and retry. + logger.Error(err, "Creating gRPC stream for node resources failed") + // TODO: expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + continue + } + for { + response, err := stream.Recv() + if err != nil { + switch { + case errors.Is(err, io.EOF): + // This is okay. Some plugins might never change their + // resources after reporting them once. + active.cancel(errors.New("plugin has closed the stream")) + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin has the method, does not really implement it. + active.cancel(errors.New("plugin does not support node resource reporting")) + case ctx.Err() == nil: + // This is a problem, report it and retry. + logger.Error(err, "Reading node resources from gRPC stream failed") + // TODO: expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + break + } + + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("Driver resources updated", "resources", response.Resources) + } else { + logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources)) + } + + c.mutex.Lock() + active.resources = response.Resources + c.mutex.Unlock() + c.queue.Add(driverName) + } + } +} + +// run is running in the background. It handles blocking initialization (like +// syncing the informer) and then syncs the actual with the desired state. +func (c *nodeResourcesController) run(ctx context.Context) { + logger := klog.FromContext(ctx) + + // When kubelet starts, we have two choices: + // - Sync immediately, which in practice will delete all NodeResourceSlices + // because no plugin has registered yet. We could do a DeleteCollection + // to speed this up. + // - Wait a bit, then sync. If all plugins have re-registered in the meantime, + // we might not need to change any NodeResourceSlice. + // + // For now syncing starts immediately, with no DeleteCollection. This + // can be reconsidered later. + + // While kubelet starts up, there are errors: + // E0226 13:41:19.880621 126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.NodeResourceSlice: failed to list *v1alpha2.NodeResourceSlice: noderesourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "noderesourceslices" in API group "resource.k8s.io" at the cluster scope + // + // The credentials used by kubeClient seem to get swapped out later, + // because eventually these list calls succeed. + // TODO: can we avoid these error log entries? Perhaps wait here? + + // We could use an indexer on driver name, but that seems overkill. + informer := resourceinformers.NewFilteredNodeResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { + options.FieldSelector = "nodeName=" + c.nodeName + }) + c.sliceStore = informer.GetStore() + handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice add", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + UpdateFunc: func(old, new any) { + oldSlice, ok := old.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + newSlice, ok := new.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("NodeResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice)) + } else { + logger.V(5).Info("NodeResourceSlice update", "slice", klog.KObj(newSlice)) + } + c.queue.Add(newSlice.DriverName) + }, + DeleteFunc: func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice delete", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + }) + if err != nil { + logger.Error(err, "Registering event handler on the NodeResourceSlice informer failed, disabling resource monitoring") + return + } + + // Start informer and wait for our cache to be populated. + c.wg.Add(1) + go func() { + defer c.wg.Done() + informer.Run(ctx.Done()) + }() + for !handler.HasSynced() { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + } + logger.Info("NodeResourceSlice informer has synced") + + for c.processNextWorkItem(ctx) { + } +} + +func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(key) + + driverName := key.(string) + var err error + func() { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("internal error: %v", r) + } + }() + err = c.sync(ctx, driverName) + }() + if err == nil { + c.queue.Forget(key) + return true + } + // TODO: contextual logging in utilruntime + utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err)) + c.queue.AddRateLimited(key) + + return true +} + +func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error { + logger := klog.FromContext(ctx) + + // Gather information about the actual and desired state. + slices := c.sliceStore.List() + var driverResources []*resourceapi.NodeResourceModel + c.mutex.RLock() + if active, ok := c.activePlugins[driverName]; ok { + // No need for a deep copy, the entire slice gets replaced on writes. + driverResources = active.resources + } + c.mutex.RUnlock() + + // Resources that are not stored in any slice already need to be published. + // Here we track the indices of the stored resources. + storedResources := sets.New[int]() + + // Slices that don't match any driver resource need to be deleted or can be + // updated. + obsoleteSlices := make([]*resourceapi.NodeResourceSlice, 0, len(slices)) + + // Match slices with resource information. +NEXTOBJ: + for _, obj := range slices { + slice := obj.(*resourceapi.NodeResourceSlice) + if slice.DriverName != driverName { + continue + } + for index, resource := range driverResources { + if apiequality.Semantic.DeepEqual(&slice.NodeResourceModel, resource) { + storedResources.Insert(index) + continue NEXTOBJ + } + } + obsoleteSlices = append(obsoleteSlices, slice) + } + + if loggerV := logger.V(6); loggerV.Enabled() { + // Dump entire resource information. + loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources) + } else { + klog.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources)) + } + + // Update stale slices before removing what's left. + // + // We don't really know which of these slices might have + // been used for "the" driver resource because they don't + // have a unique ID. In practice, a driver is most likely + // to just give us one NodeResourceModel, in which case + // this isn't a problem at all. If we have more than one, + // then at least conceptually it currently doesn't matter + // where we publish it. + // + // The long-term goal is to move the handling of + // NodeResourceSlice objects into the driver, with kubelet + // just acting as a REST proxy. The advantage of that will + // be that kubelet won't need to support the same + // resource API version as the driver and the control plane. + // With that approach, the driver will be able to match + // up objects more intelligently. + numObsoleteSlices := len(obsoleteSlices) + for index, resource := range driverResources { + if storedResources.Has(index) { + continue + } + if numObsoleteSlices > 0 { + slice := obsoleteSlices[numObsoleteSlices-1] + numObsoleteSlices-- + slice = slice.DeepCopy() + slice.NodeResourceModel = *resource + logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update node resource slice: %w", err) + } + } else { + slice := &resourceapi.NodeResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: c.nodeName + "-" + driverName + "-", + // TODO: node object as owner + }, + NodeName: c.nodeName, + DriverName: driverName, + NodeResourceModel: *resource, + } + logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create node resource slice: %w", err) + } + } + } + + // All remaining slices are truly orphaned. + for i := 0; i < numObsoleteSlices; i++ { + slice := obsoleteSlices[i] + logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice)) + if err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("delete node resource slice: %w", err) + } + } + + return nil +} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 94a9c7354de55..7acdec5f5300c 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -94,11 +95,23 @@ func (p *plugin) setVersion(version string) { } // RegistrationHandler is the handler which is fed to the pluginwatcher API. -type RegistrationHandler struct{} +type RegistrationHandler struct { + controller *nodeResourcesController +} // NewPluginHandler returns new registration handler. -func NewRegistrationHandler() *RegistrationHandler { - return &RegistrationHandler{} +// +// Must only be called once per process because it manages global state. +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. +func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler { + handler := &RegistrationHandler{} + + // If kubelet ever gets an API for stopping registration handlers, then + // that would need to be hooked up with stopping the controller. + handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName) + + return handler } // RegisterPlugin is called when a plugin can be registered. @@ -110,15 +123,18 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, return err } - // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key - // all other DRA components will be able to get the actual socket of DRA plugins by its name. - // By default we assume the supported plugin version is v1alpha3 - draPlugins.add(pluginName, &plugin{ + pluginInstance := &plugin{ conn: nil, endpoint: endpoint, version: v1alpha3Version, highestSupportedVersion: highestSupportedVersion, - }) + } + + // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key + // all other DRA components will be able to get the actual socket of DRA plugins by its name. + // By default we assume the supported plugin version is v1alpha3 + draPlugins.add(pluginName, pluginInstance) + h.controller.addPlugin(pluginName, pluginInstance) return nil } @@ -178,6 +194,7 @@ func deregisterPlugin(pluginName string) { func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { klog.InfoS("DeRegister DRA plugin", "name", pluginName) deregisterPlugin(pluginName) + h.controller.removePlugin(pluginName) } // ValidatePlugin is called by kubelet's plugin watcher upon detection diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 70499b260c848..f9d70238f6018 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -23,6 +23,10 @@ import ( ) func TestRegistrationHandler_ValidatePlugin(t *testing.T) { + newRegistrationHandler := func() *RegistrationHandler { + return NewRegistrationHandler(nil, "worker") + } + for _, test := range []struct { description string handler func() *RegistrationHandler @@ -33,19 +37,19 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }{ { description: "no versions provided", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, shouldError: true, }, { description: "unsupported version", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, versions: []string{"v2.0.0"}, shouldError: true, }, { description: "plugin already registered with a higher supported version", handler: func() *RegistrationHandler { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil { t.Fatal(err) } @@ -57,7 +61,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }, { description: "should validate the plugin", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", versions: []string{"v1.3.0"}, }, @@ -74,7 +78,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { } t.Cleanup(func() { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") }) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dbf8f554c07f6..d3b5e76077e69 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1541,7 +1541,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for DRA Plugin if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler())) + kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname))) } // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 37468eae1b283..46a3bbe683ad3 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -34,6 +34,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -54,6 +55,7 @@ const ( NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources" NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource" NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources" + NodeResourcesMethod = "/v1alpha3.Node/NodeResources" ) type Nodes struct { @@ -105,6 +107,7 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a // not run on all nodes. resources.Nodes = nodes.NodeNames } + ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources) ginkgo.DeferCleanup(d.TearDown) }) @@ -280,6 +283,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { return d.interceptor(nodename, ctx, req, info, handler) }), + kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + return d.streamInterceptor(nodename, srv, ss, info, handler) + }), kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), kubeletplugin.KubeletPluginSocketPath(draAddr), @@ -355,6 +361,16 @@ func (d *Driver) TearDown() { d.wg.Wait() } +func (d *Driver) IsGone(ctx context.Context) { + gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := d.f.ClientSet.ResourceV1alpha2().NodeResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name}) + if err != nil { + return nil, err + } + return slices.Items, err + }).Should(gomega.BeEmpty()) +} + func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { d.mutex.Lock() defer d.mutex.Unlock() @@ -368,6 +384,22 @@ func (d *Driver) interceptor(nodename string, ctx context.Context, req interface return handler(ctx, req) } +func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Stream calls block for a long time. We must not hold the lock while + // they are running. + d.mutex.Lock() + m := MethodInstance{nodename, info.FullMethod} + d.callCounts[m]++ + fail := d.fail[m] + d.mutex.Unlock() + + if fail { + return errors.New("injected error") + } + + return handler(srv, stream) +} + func (d *Driver) Fail(m MethodInstance, injectError bool) { d.mutex.Lock() defer d.mutex.Unlock() diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index b88357d8db14c..557c8a4e49645 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -28,6 +28,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/gstruct" v1 "k8s.io/api/core/v1" namedresourcesapi "k8s.io/api/resource/structured/namedresources/v1alpha1" @@ -82,116 +83,170 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.Context("kubelet", func() { nodes := NewNodes(f, 1, 1) - driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance. - b := newBuilder(f, driver) - ginkgo.It("registers plugin", func() { - ginkgo.By("the driver is running") - }) + ginkgo.Context("with ConfigMap parameters", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) - ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { - // We have exactly one host. - m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} + ginkgo.It("registers plugin", func() { + ginkgo.By("the driver is running") + }) - driver.Fail(m, true) + ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { + // We have exactly one host. + m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} - ginkgo.By("waiting for container startup to fail") - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + driver.Fail(m, true) - b.create(ctx, parameters, pod, template) + ginkgo.By("waiting for container startup to fail") + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - ginkgo.By("wait for NodePrepareResources call") - gomega.Eventually(ctx, func(ctx context.Context) error { - if driver.CallCount(m) == 0 { - return errors.New("NodePrepareResources not called yet") + b.create(ctx, parameters, pod, template) + + ginkgo.By("wait for NodePrepareResources call") + gomega.Eventually(ctx, func(ctx context.Context) error { + if driver.CallCount(m) == 0 { + return errors.New("NodePrepareResources not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("allowing container startup to succeed") + callCount := driver.CallCount(m) + driver.Fail(m, false) + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "start pod with inline resource claim") + if driver.CallCount(m) == callCount { + framework.Fail("NodePrepareResources should have been called again") } - return nil - }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) - - ginkgo.By("allowing container startup to succeed") - callCount := driver.CallCount(m) - driver.Fail(m, false) - err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) - framework.ExpectNoError(err, "start pod with inline resource claim") - if driver.CallCount(m) == callCount { - framework.Fail("NodePrepareResources should have been called again") - } - }) + }) - ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { - // Pretend that the resource is allocated and reserved for some other entity. - // Until the resourceclaim controller learns to remove reservations for - // arbitrary types we can simply fake somthing here. - claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - b.create(ctx, claim) - claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) - framework.ExpectNoError(err, "get claim") - claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} - claim.Status.DriverName = driver.Name - claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ - APIGroup: "example.com", - Resource: "some", - Name: "thing", - UID: "12345", + ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { + // Pretend that the resource is allocated and reserved for some other entity. + // Until the resourceclaim controller learns to remove reservations for + // arbitrary types we can simply fake somthing here. + claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + b.create(ctx, claim) + claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get claim") + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + claim.Status.DriverName = driver.Name + claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ + APIGroup: "example.com", + Resource: "some", + Name: "thing", + UID: "12345", + }) + _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "update claim") + + pod := b.podExternal() + + // This bypasses scheduling and therefore the pod gets + // to run on the node although it never gets added to + // the `ReservedFor` field of the claim. + pod.Spec.NodeName = nodes.NodeNames[0] + b.create(ctx, pod) + + gomega.Consistently(ctx, func(ctx context.Context) error { + testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) + } + if testPod.Status.Phase != v1.PodPending { + return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + } + return nil + }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) }) - _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - framework.ExpectNoError(err, "update claim") - pod := b.podExternal() + ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { + parameters := b.parameters() + claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) + pod := b.podExternal() + zero := int64(0) + pod.Spec.TerminationGracePeriodSeconds = &zero - // This bypasses scheduling and therefore the pod gets - // to run on the node although it never gets added to - // the `ReservedFor` field of the claim. - pod.Spec.NodeName = nodes.NodeNames[0] - b.create(ctx, pod) + b.create(ctx, parameters, claim, pod) - gomega.Consistently(ctx, func(ctx context.Context) error { - testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) - } - if testPod.Status.Phase != v1.PodPending { - return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + b.testPod(ctx, f.ClientSet, pod) + + ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) + err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "force delete test pod") } - return nil - }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) - }) - ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { - parameters := b.parameters() - claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) - pod := b.podExternal() - zero := int64(0) - pod.Spec.TerminationGracePeriodSeconds = &zero + for host, plugin := range b.driver.Nodes { + ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) + gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) + } + }) - b.create(ctx, parameters, claim, pod) + ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].Resources.Claims = nil + } + b.create(ctx, parameters, pod, template) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") + for host, plugin := range b.driver.Nodes { + gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) + } + }) - b.testPod(ctx, f.ClientSet, pod) + }) - ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) - err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) - if !apierrors.IsNotFound(err) { - framework.ExpectNoError(err, "force delete test pod") - } + ginkgo.Context("with structured parameters", func() { + driver := NewDriver(f, nodes, perNode(1, nodes)) + driver.parameterMode = parameterModeStructured + + f.It("must manage NodeResourceSlice", f.WithSlow(), func(ctx context.Context) { + nodeName := nodes.NodeNames[0] + driverName := driver.Name + m := MethodInstance{nodeName, NodeResourcesMethod} + ginkgo.By("wait for NodeResources call") + gomega.Eventually(ctx, func() error { + if driver.CallCount(m) == 0 { + return errors.New("NodeResources not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("check if NodeResourceSlice object exists on the API server") + resourceClient := f.ClientSet.ResourceV1alpha2().NodeResourceSlices() + matchSlices := gomega.And( + gomega.HaveLen(1), + gomega.ContainElement(gstruct.MatchAllFields(gstruct.Fields{ + "TypeMeta": gstruct.Ignore(), + "ObjectMeta": gstruct.Ignore(), // TODO: validate ownerref + "NodeName": gomega.Equal(nodes.NodeNames[0]), + "DriverName": gomega.Equal(driver.Name), + "NodeResourceModel": gomega.Equal(resourcev1alpha2.NodeResourceModel{NamedResources: &namedresourcesapi.Resources{ + Instances: []namedresourcesapi.Instance{{Name: "instance-0"}}, + }}), + })), + ) + getSlices := func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := resourceClient.List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("nodeName=%s,driverName=%s", nodeName, driverName)}) + if err != nil { + return nil, err + } + return slices.Items, nil + } + gomega.Eventually(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) + gomega.Consistently(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) - for host, plugin := range b.driver.Nodes { - ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) - gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) - } - }) + // Removal of node resource slice is tested by the general driver removal code. + }) - ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - for i := range pod.Spec.Containers { - pod.Spec.Containers[i].Resources.Claims = nil - } - b.create(ctx, parameters, pod, template) - framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") - for host, plugin := range b.driver.Nodes { - gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) - } + // TODO: more test scenarios: + // - driver returns "unimplemented" as method response + // - driver returns "Unimplemented" as part of stream + // - driver returns EOF + // - driver changes resources }) }) @@ -241,11 +296,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.It("supports claim and class parameters", func(ctx context.Context) { objects := genParameters() - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) objects = append(objects, pod, template) @@ -263,11 +313,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod, template) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -298,11 +343,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -340,11 +380,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 72bd9f013c93e..bfc404cc40f0d 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -372,6 +372,7 @@ func (ex *ExamplePlugin) NodeResources(req *drapbv1alpha3.NodeResourcesRequest, // Keep the stream open until the test is done. // TODO: test sending more updates later <-ex.stopCh + ex.logger.Info("Done sending NodeResourcesResponse, closing stream") return nil }