diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 9a9338e7705e2..f0a8c020990bf 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -842,6 +842,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend s.FailSwapOn, kubeDeps.Recorder, kubeDeps.KubeClient, + nodeName, ) if err != nil { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 4b3d9ffe9dc2d..0b571028402fe 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -198,7 +198,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) { // TODO(vmarmol): Add limits to the system containers. // Takes the absolute name of the specified containers. // Empty container name disables use of the specified container. -func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) { +func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface, nodeName types.NodeName) (ContainerManager, error) { subsystems, err := GetCgroupSubsystems() if err != nil { return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) @@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I // initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") - cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir) + cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeName) if err != nil { return nil, err } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 085366ac0afcf..49cd60cb266d6 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -45,7 +45,7 @@ type ManagerImpl struct { } // NewManagerImpl creates a new manager. -func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) { +func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) { klog.V(2).InfoS("Creating DRA manager") claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 9e5e8c4bd673c..dc0e697d0dd16 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory) + manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker") if test.wantErr { assert.Error(t, err) return @@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - manager, err := NewManagerImpl(kubeClient, t.TempDir()) + manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker") assert.NoError(t, err) if test.claimInfo != nil { @@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler() + plg := plugin.NewRegistrationHandler(nil, "worker") if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/client_test.go b/pkg/kubelet/cm/dra/plugin/client_test.go index 3e1889e2c9787..339b57b9d5cdf 100644 --- a/pkg/kubelet/cm/dra/plugin/client_test.go +++ b/pkg/kubelet/cm/dra/plugin/client_test.go @@ -43,6 +43,12 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in * return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil } +func (f *fakeV1alpha3GRPCServer) NodeResources(req *drapbv1alpha3.NodeResourcesRequest, srv drapbv1alpha3.Node_NodeResourcesServer) error { + srv.Send(&drapbv1alpha3.NodeResourcesResponse{}) + srv.Send(&drapbv1alpha3.NodeResourcesResponse{}) + return nil +} + type fakeV1alpha2GRPCServer struct { drapbv1alpha2.UnimplementedNodeServer } @@ -288,3 +294,82 @@ func TestNodeUnprepareResource(t *testing.T) { }) } } + +func TestNodeResources(t *testing.T) { + for _, test := range []struct { + description string + serverSetup func(string) (string, tearDown, error) + serverVersion string + request *drapbv1alpha3.NodeResourcesRequest + responses []*drapbv1alpha3.NodeResourcesResponse + expectError string + }{ + { + description: "server supports NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha3Version, + request: &drapbv1alpha3.NodeResourcesRequest{}, + responses: []*drapbv1alpha3.NodeResourcesResponse{ + {}, + {}, + }, + expectError: "EOF", + }, + { + description: "server doesn't support NodeResources API", + serverSetup: setupFakeGRPCServer, + serverVersion: v1alpha2Version, + request: new(drapbv1alpha3.NodeResourcesRequest), + expectError: "Unimplemented", + }, + } { + t.Run(test.description, func(t *testing.T) { + addr, teardown, err := setupFakeGRPCServer(test.serverVersion) + if err != nil { + t.Fatal(err) + } + defer teardown() + + p := &plugin{ + endpoint: addr, + version: v1alpha3Version, + } + + conn, err := p.getOrCreateGRPCConn() + defer func() { + err := conn.Close() + if err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + + draPlugins.add("dummy-plugin", p) + defer draPlugins.delete("dummy-plugin") + + client, err := NewDRAPluginClient("dummy-plugin") + if err != nil { + t.Fatal(err) + } + + stream, err := client.NodeResources(context.Background(), test.request) + if err != nil { + t.Fatal(err) + } + var actualResponses []*drapbv1alpha3.NodeResourcesResponse + var actualErr error + for { + resp, err := stream.Recv() + if err != nil { + actualErr = err + break + } + actualResponses = append(actualResponses, resp) + } + assert.Equal(t, test.responses, actualResponses) + assert.Contains(t, actualErr.Error(), test.expectError) + }) + } +} diff --git a/pkg/kubelet/cm/dra/plugin/noderesources.go b/pkg/kubelet/cm/dra/plugin/noderesources.go new file mode 100644 index 0000000000000..94b27f13fc641 --- /dev/null +++ b/pkg/kubelet/cm/dra/plugin/noderesources.go @@ -0,0 +1,447 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugin + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "k8s.io/api/resource/v1alpha2" + resourceapi "k8s.io/api/resource/v1alpha2" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + resourceinformers "k8s.io/client-go/informers/resource/v1alpha2" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3" +) + +const ( + // resyncPeriod for informer + // TODO: disable? + resyncPeriod = time.Duration(10 * time.Minute) +) + +// nodeResourcesController collects resource information from all registered +// plugins and synchronizes that information with NodeResourceSlice objects. +type nodeResourcesController struct { + ctx context.Context + kubeClient kubernetes.Interface + nodeName string + wg sync.WaitGroup + queue workqueue.RateLimitingInterface + sliceStore cache.Store + + mutex sync.RWMutex + activePlugins map[string]*activePlugin +} + +// activePlugin holds the resource information about one plugin +// and the gRPC stream that is used to retrieve that. The context +// used by that stream can be canceled separately to stop +// the monitoring. +type activePlugin struct { + cancel func(reason error) + + // resources is protected by the nodeResourcesController read/write lock. + // When receiving updates from the driver, the entire slice gets replaced, + // so it is okay to not do a deep copy of it. Only retrieving the slice + // must be protected by a read lock. + resources []*v1alpha2.NodeResourceModel +} + +// startNodeResourcesController constructs a new controller and starts it. +// +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. Without it, +// the controller is inactive. +func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, nodeName string) *nodeResourcesController { + if kubeClient == nil { + return nil + } + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithName(logger, "node resources controller") + ctx = klog.NewContext(ctx, logger) + + c := &nodeResourcesController{ + ctx: ctx, + kubeClient: kubeClient, + nodeName: nodeName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"), + activePlugins: make(map[string]*activePlugin), + } + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.run(ctx) + }() + + return c +} + +// waitForStop blocks until all background activity spawned by +// the controller has stopped. The context passed to start must +// be canceled for that to happen. +func (c *nodeResourcesController) waitForStop() { + if c == nil { + return + } + + c.wg.Wait() +} + +// addPlugin is called whenever a plugin has been (re-)registered. +func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + + if active := c.activePlugins[driverName]; active != nil { + active.cancel(errors.New("plugin has re-registered")) + } + active := &activePlugin{} + cancelCtx, cancel := context.WithCancelCause(c.ctx) + active.cancel = cancel + c.activePlugins[driverName] = active + c.queue.Add(driverName) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.monitorPlugin(cancelCtx, active, driverName, pluginInstance) + }() +} + +// removePlugin is called whenever a plugin has been unregistered. +func (c *nodeResourcesController) removePlugin(driverName string) { + if c == nil { + return + } + + klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName) + c.mutex.Lock() + defer c.mutex.Unlock() + if active, ok := c.activePlugins[driverName]; ok { + active.cancel(errors.New("plugin has unregistered")) + delete(c.activePlugins, driverName) + c.queue.Add(driverName) + } +} + +func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) { + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "driverName", driverName) + logger.Info("Starting to monitor node resources of the plugin") + defer func() { + r := recover() + logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r) + }() + + // Keep trying until canceled. + for ctx.Err() == nil { + logger.V(5).Info("Calling NodeResources") + stream, err := pluginInstance.NodeResources(ctx, new(drapb.NodeResourcesRequest)) + if err != nil { + switch { + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin simply doesn't provide node resources. + active.cancel(errors.New("plugin does not support node resource reporting")) + default: + // This is a problem, report it and retry. + logger.Error(err, "Creating gRPC stream for node resources failed") + // TODO: expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + continue + } + for { + response, err := stream.Recv() + if err != nil { + switch { + case errors.Is(err, io.EOF): + // This is okay. Some plugins might never change their + // resources after reporting them once. + active.cancel(errors.New("plugin has closed the stream")) + case status.Convert(err).Code() == codes.Unimplemented: + // The plugin has the method, does not really implement it. + active.cancel(errors.New("plugin does not support node resource reporting")) + case ctx.Err() == nil: + // This is a problem, report it and retry. + logger.Error(err, "Reading node resources from gRPC stream failed") + // TODO: expontential backoff? + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + } + } + break + } + + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("Driver resources updated", "resources", response.Resources) + } else { + logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources)) + } + + c.mutex.Lock() + active.resources = response.Resources + c.mutex.Unlock() + c.queue.Add(driverName) + } + } +} + +// run is running in the background. It handles blocking initialization (like +// syncing the informer) and then syncs the actual with the desired state. +func (c *nodeResourcesController) run(ctx context.Context) { + logger := klog.FromContext(ctx) + + // When kubelet starts, we have two choices: + // - Sync immediately, which in practice will delete all NodeResourceSlices + // because no plugin has registered yet. We could do a DeleteCollection + // to speed this up. + // - Wait a bit, then sync. If all plugins have re-registered in the meantime, + // we might not need to change any NodeResourceSlice. + // + // For now syncing starts immediately, with no DeleteCollection. This + // can be reconsidered later. + + // While kubelet starts up, there are errors: + // E0226 13:41:19.880621 126334 reflector.go:150] k8s.io/client-go@v0.0.0/tools/cache/reflector.go:232: Failed to watch *v1alpha2.NodeResourceSlice: failed to list *v1alpha2.NodeResourceSlice: noderesourceslices.resource.k8s.io is forbidden: User "system:anonymous" cannot list resource "noderesourceslices" in API group "resource.k8s.io" at the cluster scope + // + // The credentials used by kubeClient seem to get swapped out later, + // because eventually these list calls succeed. + // TODO: can we avoid these error log entries? Perhaps wait here? + + // We could use an indexer on driver name, but that seems overkill. + informer := resourceinformers.NewFilteredNodeResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { + options.FieldSelector = "nodeName=" + c.nodeName + }) + c.sliceStore = informer.GetStore() + handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice add", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + UpdateFunc: func(old, new any) { + oldSlice, ok := old.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + newSlice, ok := new.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + if loggerV := logger.V(6); loggerV.Enabled() { + loggerV.Info("NodeResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice)) + } else { + logger.V(5).Info("NodeResourceSlice update", "slice", klog.KObj(newSlice)) + } + c.queue.Add(newSlice.DriverName) + }, + DeleteFunc: func(obj any) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + } + slice, ok := obj.(*resourceapi.NodeResourceSlice) + if !ok { + return + } + logger.V(5).Info("NodeResourceSlice delete", "slice", klog.KObj(slice)) + c.queue.Add(slice.DriverName) + }, + }) + if err != nil { + logger.Error(err, "Registering event handler on the NodeResourceSlice informer failed, disabling resource monitoring") + return + } + + // Start informer and wait for our cache to be populated. + c.wg.Add(1) + go func() { + defer c.wg.Done() + informer.Run(ctx.Done()) + }() + for !handler.HasSynced() { + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + } + logger.Info("NodeResourceSlice informer has synced") + + for c.processNextWorkItem(ctx) { + } +} + +func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool { + key, shutdown := c.queue.Get() + if shutdown { + return false + } + defer c.queue.Done(key) + + driverName := key.(string) + var err error + func() { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("internal error: %v", r) + } + }() + err = c.sync(ctx, driverName) + }() + if err == nil { + c.queue.Forget(key) + return true + } + // TODO: contextual logging in utilruntime + utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err)) + c.queue.AddRateLimited(key) + + return true +} + +func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error { + logger := klog.FromContext(ctx) + + // Gather information about the actual and desired state. + slices := c.sliceStore.List() + var driverResources []*resourceapi.NodeResourceModel + c.mutex.RLock() + if active, ok := c.activePlugins[driverName]; ok { + // No need for a deep copy, the entire slice gets replaced on writes. + driverResources = active.resources + } + c.mutex.RUnlock() + + // Resources that are not stored in any slice already need to be published. + // Here we track the indices of the stored resources. + storedResources := sets.New[int]() + + // Slices that don't match any driver resource need to be deleted or can be + // updated. + obsoleteSlices := make([]*resourceapi.NodeResourceSlice, 0, len(slices)) + + // Match slices with resource information. +NEXTOBJ: + for _, obj := range slices { + slice := obj.(*resourceapi.NodeResourceSlice) + if slice.DriverName != driverName { + continue + } + for index, resource := range driverResources { + if apiequality.Semantic.DeepEqual(&slice.NodeResourceModel, resource) { + storedResources.Insert(index) + continue NEXTOBJ + } + } + obsoleteSlices = append(obsoleteSlices, slice) + } + + if loggerV := logger.V(6); loggerV.Enabled() { + // Dump entire resource information. + loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources) + } else { + klog.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources)) + } + + // Update stale slices before removing what's left. + // + // We don't really know which of these slices might have + // been used for "the" driver resource because they don't + // have a unique ID. In practice, a driver is most likely + // to just give us one NodeResourceModel, in which case + // this isn't a problem at all. If we have more than one, + // then at least conceptually it currently doesn't matter + // where we publish it. + // + // The long-term goal is to move the handling of + // NodeResourceSlice objects into the driver, with kubelet + // just acting as a REST proxy. The advantage of that will + // be that kubelet won't need to support the same + // resource API version as the driver and the control plane. + // With that approach, the driver will be able to match + // up objects more intelligently. + numObsoleteSlices := len(obsoleteSlices) + for index, resource := range driverResources { + if storedResources.Has(index) { + continue + } + if numObsoleteSlices > 0 { + slice := obsoleteSlices[numObsoleteSlices-1] + numObsoleteSlices-- + slice = slice.DeepCopy() + slice.NodeResourceModel = *resource + logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("update node resource slice: %w", err) + } + } else { + slice := &resourceapi.NodeResourceSlice{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: c.nodeName + "-" + driverName + "-", + // TODO: node object as owner + }, + NodeName: c.nodeName, + DriverName: driverName, + NodeResourceModel: *resource, + } + logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create node resource slice: %w", err) + } + } + } + + // All remaining slices are truly orphaned. + for i := 0; i < numObsoleteSlices; i++ { + slice := obsoleteSlices[i] + logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice)) + if err := c.kubeClient.ResourceV1alpha2().NodeResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("delete node resource slice: %w", err) + } + } + + return nil +} diff --git a/pkg/kubelet/cm/dra/plugin/plugin.go b/pkg/kubelet/cm/dra/plugin/plugin.go index 94a9c7354de55..7acdec5f5300c 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin.go +++ b/pkg/kubelet/cm/dra/plugin/plugin.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" utilversion "k8s.io/apimachinery/pkg/util/version" + "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -94,11 +95,23 @@ func (p *plugin) setVersion(version string) { } // RegistrationHandler is the handler which is fed to the pluginwatcher API. -type RegistrationHandler struct{} +type RegistrationHandler struct { + controller *nodeResourcesController +} // NewPluginHandler returns new registration handler. -func NewRegistrationHandler() *RegistrationHandler { - return &RegistrationHandler{} +// +// Must only be called once per process because it manages global state. +// If a kubeClient is provided, then it synchronizes NodeResourceSlices +// with the resource information provided by plugins. +func NewRegistrationHandler(kubeClient kubernetes.Interface, nodeName string) *RegistrationHandler { + handler := &RegistrationHandler{} + + // If kubelet ever gets an API for stopping registration handlers, then + // that would need to be hooked up with stopping the controller. + handler.controller = startNodeResourcesController(context.TODO(), kubeClient, nodeName) + + return handler } // RegisterPlugin is called when a plugin can be registered. @@ -110,15 +123,18 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, return err } - // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key - // all other DRA components will be able to get the actual socket of DRA plugins by its name. - // By default we assume the supported plugin version is v1alpha3 - draPlugins.add(pluginName, &plugin{ + pluginInstance := &plugin{ conn: nil, endpoint: endpoint, version: v1alpha3Version, highestSupportedVersion: highestSupportedVersion, - }) + } + + // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key + // all other DRA components will be able to get the actual socket of DRA plugins by its name. + // By default we assume the supported plugin version is v1alpha3 + draPlugins.add(pluginName, pluginInstance) + h.controller.addPlugin(pluginName, pluginInstance) return nil } @@ -178,6 +194,7 @@ func deregisterPlugin(pluginName string) { func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { klog.InfoS("DeRegister DRA plugin", "name", pluginName) deregisterPlugin(pluginName) + h.controller.removePlugin(pluginName) } // ValidatePlugin is called by kubelet's plugin watcher upon detection diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 70499b260c848..f9d70238f6018 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -23,6 +23,10 @@ import ( ) func TestRegistrationHandler_ValidatePlugin(t *testing.T) { + newRegistrationHandler := func() *RegistrationHandler { + return NewRegistrationHandler(nil, "worker") + } + for _, test := range []struct { description string handler func() *RegistrationHandler @@ -33,19 +37,19 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }{ { description: "no versions provided", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, shouldError: true, }, { description: "unsupported version", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, versions: []string{"v2.0.0"}, shouldError: true, }, { description: "plugin already registered with a higher supported version", handler: func() *RegistrationHandler { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() if err := handler.RegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide", "", []string{"v1.1.0"}); err != nil { t.Fatal(err) } @@ -57,7 +61,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { }, { description: "should validate the plugin", - handler: NewRegistrationHandler, + handler: newRegistrationHandler, pluginName: "this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide", versions: []string{"v1.3.0"}, }, @@ -74,7 +78,7 @@ func TestRegistrationHandler_ValidatePlugin(t *testing.T) { } t.Cleanup(func() { - handler := NewRegistrationHandler() + handler := newRegistrationHandler() handler.DeRegisterPlugin("this-plugin-already-exists-and-has-a-long-name-so-it-doesnt-collide") handler.DeRegisterPlugin("this-is-a-dummy-plugin-with-a-long-name-so-it-doesnt-collide") }) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dbf8f554c07f6..d3b5e76077e69 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1541,7 +1541,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for DRA Plugin if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler())) + kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.hostname))) } // Adding Registration Callback function for Device Manager kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 37468eae1b283..46a3bbe683ad3 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -34,6 +34,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -54,6 +55,7 @@ const ( NodePrepareResourcesMethod = "/v1alpha3.Node/NodePrepareResources" NodeUnprepareResourceMethod = "/v1alpha2.Node/NodeUnprepareResource" NodeUnprepareResourcesMethod = "/v1alpha3.Node/NodeUnprepareResources" + NodeResourcesMethod = "/v1alpha3.Node/NodeResources" ) type Nodes struct { @@ -105,6 +107,7 @@ func NewDriver(f *framework.Framework, nodes *Nodes, configureResources func() a // not run on all nodes. resources.Nodes = nodes.NodeNames } + ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources) ginkgo.DeferCleanup(d.TearDown) }) @@ -280,6 +283,9 @@ func (d *Driver) SetUp(nodes *Nodes, resources app.Resources) { kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { return d.interceptor(nodename, ctx, req, info, handler) }), + kubeletplugin.GRPCStreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) { + return d.streamInterceptor(nodename, srv, ss, info, handler) + }), kubeletplugin.PluginListener(listen(ctx, d.f, pod.Name, "plugin", 9001)), kubeletplugin.RegistrarListener(listen(ctx, d.f, pod.Name, "registrar", 9000)), kubeletplugin.KubeletPluginSocketPath(draAddr), @@ -355,6 +361,16 @@ func (d *Driver) TearDown() { d.wg.Wait() } +func (d *Driver) IsGone(ctx context.Context) { + gomega.Eventually(ctx, func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := d.f.ClientSet.ResourceV1alpha2().NodeResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: "driverName=" + d.Name}) + if err != nil { + return nil, err + } + return slices.Items, err + }).Should(gomega.BeEmpty()) +} + func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { d.mutex.Lock() defer d.mutex.Unlock() @@ -368,6 +384,22 @@ func (d *Driver) interceptor(nodename string, ctx context.Context, req interface return handler(ctx, req) } +func (d *Driver) streamInterceptor(nodename string, srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // Stream calls block for a long time. We must not hold the lock while + // they are running. + d.mutex.Lock() + m := MethodInstance{nodename, info.FullMethod} + d.callCounts[m]++ + fail := d.fail[m] + d.mutex.Unlock() + + if fail { + return errors.New("injected error") + } + + return handler(srv, stream) +} + func (d *Driver) Fail(m MethodInstance, injectError bool) { d.mutex.Lock() defer d.mutex.Unlock() diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index b88357d8db14c..557c8a4e49645 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -28,6 +28,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/onsi/gomega/gcustom" + "github.com/onsi/gomega/gstruct" v1 "k8s.io/api/core/v1" namedresourcesapi "k8s.io/api/resource/structured/namedresources/v1alpha1" @@ -82,116 +83,170 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.Context("kubelet", func() { nodes := NewNodes(f, 1, 1) - driver := NewDriver(f, nodes, networkResources) // All tests get their own driver instance. - b := newBuilder(f, driver) - ginkgo.It("registers plugin", func() { - ginkgo.By("the driver is running") - }) + ginkgo.Context("with ConfigMap parameters", func() { + driver := NewDriver(f, nodes, networkResources) + b := newBuilder(f, driver) - ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { - // We have exactly one host. - m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} + ginkgo.It("registers plugin", func() { + ginkgo.By("the driver is running") + }) - driver.Fail(m, true) + ginkgo.It("must retry NodePrepareResources", func(ctx context.Context) { + // We have exactly one host. + m := MethodInstance{driver.Nodenames()[0], NodePrepareResourcesMethod} - ginkgo.By("waiting for container startup to fail") - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + driver.Fail(m, true) - b.create(ctx, parameters, pod, template) + ginkgo.By("waiting for container startup to fail") + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - ginkgo.By("wait for NodePrepareResources call") - gomega.Eventually(ctx, func(ctx context.Context) error { - if driver.CallCount(m) == 0 { - return errors.New("NodePrepareResources not called yet") + b.create(ctx, parameters, pod, template) + + ginkgo.By("wait for NodePrepareResources call") + gomega.Eventually(ctx, func(ctx context.Context) error { + if driver.CallCount(m) == 0 { + return errors.New("NodePrepareResources not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("allowing container startup to succeed") + callCount := driver.CallCount(m) + driver.Fail(m, false) + err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) + framework.ExpectNoError(err, "start pod with inline resource claim") + if driver.CallCount(m) == callCount { + framework.Fail("NodePrepareResources should have been called again") } - return nil - }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) - - ginkgo.By("allowing container startup to succeed") - callCount := driver.CallCount(m) - driver.Fail(m, false) - err := e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace) - framework.ExpectNoError(err, "start pod with inline resource claim") - if driver.CallCount(m) == callCount { - framework.Fail("NodePrepareResources should have been called again") - } - }) + }) - ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { - // Pretend that the resource is allocated and reserved for some other entity. - // Until the resourceclaim controller learns to remove reservations for - // arbitrary types we can simply fake somthing here. - claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - b.create(ctx, claim) - claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) - framework.ExpectNoError(err, "get claim") - claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} - claim.Status.DriverName = driver.Name - claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ - APIGroup: "example.com", - Resource: "some", - Name: "thing", - UID: "12345", + ginkgo.It("must not run a pod if a claim is not reserved for it", func(ctx context.Context) { + // Pretend that the resource is allocated and reserved for some other entity. + // Until the resourceclaim controller learns to remove reservations for + // arbitrary types we can simply fake somthing here. + claim := b.externalClaim(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + b.create(ctx, claim) + claim, err := f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).Get(ctx, claim.Name, metav1.GetOptions{}) + framework.ExpectNoError(err, "get claim") + claim.Status.Allocation = &resourcev1alpha2.AllocationResult{} + claim.Status.DriverName = driver.Name + claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ + APIGroup: "example.com", + Resource: "some", + Name: "thing", + UID: "12345", + }) + _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "update claim") + + pod := b.podExternal() + + // This bypasses scheduling and therefore the pod gets + // to run on the node although it never gets added to + // the `ReservedFor` field of the claim. + pod.Spec.NodeName = nodes.NodeNames[0] + b.create(ctx, pod) + + gomega.Consistently(ctx, func(ctx context.Context) error { + testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) + } + if testPod.Status.Phase != v1.PodPending { + return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + } + return nil + }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) }) - _, err = f.ClientSet.ResourceV1alpha2().ResourceClaims(f.Namespace.Name).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) - framework.ExpectNoError(err, "update claim") - pod := b.podExternal() + ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { + parameters := b.parameters() + claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) + pod := b.podExternal() + zero := int64(0) + pod.Spec.TerminationGracePeriodSeconds = &zero - // This bypasses scheduling and therefore the pod gets - // to run on the node although it never gets added to - // the `ReservedFor` field of the claim. - pod.Spec.NodeName = nodes.NodeNames[0] - b.create(ctx, pod) + b.create(ctx, parameters, claim, pod) - gomega.Consistently(ctx, func(ctx context.Context) error { - testPod, err := b.f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("expected the test pod %s to exist: %w", pod.Name, err) - } - if testPod.Status.Phase != v1.PodPending { - return fmt.Errorf("pod %s: unexpected status %s, expected status: %s", pod.Name, testPod.Status.Phase, v1.PodPending) + b.testPod(ctx, f.ClientSet, pod) + + ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) + err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if !apierrors.IsNotFound(err) { + framework.ExpectNoError(err, "force delete test pod") } - return nil - }, 20*time.Second, 200*time.Millisecond).Should(gomega.BeNil()) - }) - ginkgo.It("must unprepare resources for force-deleted pod", func(ctx context.Context) { - parameters := b.parameters() - claim := b.externalClaim(resourcev1alpha2.AllocationModeImmediate) - pod := b.podExternal() - zero := int64(0) - pod.Spec.TerminationGracePeriodSeconds = &zero + for host, plugin := range b.driver.Nodes { + ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) + gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) + } + }) - b.create(ctx, parameters, claim, pod) + ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { + parameters := b.parameters() + pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].Resources.Claims = nil + } + b.create(ctx, parameters, pod, template) + framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") + for host, plugin := range b.driver.Nodes { + gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) + } + }) - b.testPod(ctx, f.ClientSet, pod) + }) - ginkgo.By(fmt.Sprintf("force delete test pod %s", pod.Name)) - err := b.f.ClientSet.CoreV1().Pods(b.f.Namespace.Name).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) - if !apierrors.IsNotFound(err) { - framework.ExpectNoError(err, "force delete test pod") - } + ginkgo.Context("with structured parameters", func() { + driver := NewDriver(f, nodes, perNode(1, nodes)) + driver.parameterMode = parameterModeStructured + + f.It("must manage NodeResourceSlice", f.WithSlow(), func(ctx context.Context) { + nodeName := nodes.NodeNames[0] + driverName := driver.Name + m := MethodInstance{nodeName, NodeResourcesMethod} + ginkgo.By("wait for NodeResources call") + gomega.Eventually(ctx, func() error { + if driver.CallCount(m) == 0 { + return errors.New("NodeResources not called yet") + } + return nil + }).WithTimeout(podStartTimeout).Should(gomega.Succeed()) + + ginkgo.By("check if NodeResourceSlice object exists on the API server") + resourceClient := f.ClientSet.ResourceV1alpha2().NodeResourceSlices() + matchSlices := gomega.And( + gomega.HaveLen(1), + gomega.ContainElement(gstruct.MatchAllFields(gstruct.Fields{ + "TypeMeta": gstruct.Ignore(), + "ObjectMeta": gstruct.Ignore(), // TODO: validate ownerref + "NodeName": gomega.Equal(nodes.NodeNames[0]), + "DriverName": gomega.Equal(driver.Name), + "NodeResourceModel": gomega.Equal(resourcev1alpha2.NodeResourceModel{NamedResources: &namedresourcesapi.Resources{ + Instances: []namedresourcesapi.Instance{{Name: "instance-0"}}, + }}), + })), + ) + getSlices := func(ctx context.Context) ([]resourcev1alpha2.NodeResourceSlice, error) { + slices, err := resourceClient.List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("nodeName=%s,driverName=%s", nodeName, driverName)}) + if err != nil { + return nil, err + } + return slices.Items, nil + } + gomega.Eventually(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) + gomega.Consistently(ctx, getSlices).WithTimeout(20 * time.Second).Should(matchSlices) - for host, plugin := range b.driver.Nodes { - ginkgo.By(fmt.Sprintf("waiting for resources on %s to be unprepared", host)) - gomega.Eventually(plugin.GetPreparedResources).WithTimeout(time.Minute).Should(gomega.BeEmpty(), "prepared claims on host %s", host) - } - }) + // Removal of node resource slice is tested by the general driver removal code. + }) - ginkgo.It("must skip NodePrepareResource if not used by any container", func(ctx context.Context) { - parameters := b.parameters() - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) - for i := range pod.Spec.Containers { - pod.Spec.Containers[i].Resources.Claims = nil - } - b.create(ctx, parameters, pod, template) - framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod), "start pod") - for host, plugin := range b.driver.Nodes { - gomega.Expect(plugin.GetPreparedResources()).Should(gomega.BeEmpty(), "not claims should be prepared on host %s while pod is running", host) - } + // TODO: more test scenarios: + // - driver returns "unimplemented" as method response + // - driver returns "Unimplemented" as part of stream + // - driver returns EOF + // - driver changes resources }) }) @@ -241,11 +296,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, ginkgo.It("supports claim and class parameters", func(ctx context.Context) { objects := genParameters() - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - pod, template := b.podInline(resourcev1alpha2.AllocationModeWaitForFirstConsumer) objects = append(objects, pod, template) @@ -263,11 +313,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod, template) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -298,11 +343,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. @@ -340,11 +380,6 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, objects = append(objects, pod) } - // TODO: replace with publishing NodeResourceSlice through kubelet - if parameterMode == parameterModeTranslated || parameterMode == parameterModeStructured { - objects = append(objects, b.nodeResourceSlice(nodes.NodeNames[0], maxAllocations)) - } - b.create(ctx, objects...) // We don't know the order. All that matters is that all of them get scheduled eventually. diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 72bd9f013c93e..bfc404cc40f0d 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -372,6 +372,7 @@ func (ex *ExamplePlugin) NodeResources(req *drapbv1alpha3.NodeResourcesRequest, // Keep the stream open until the test is done. // TODO: test sending more updates later <-ex.stopCh + ex.logger.Info("Done sending NodeResourcesResponse, closing stream") return nil }