Skip to content

Commit

Permalink
dra kubelet: implement NodeResourceSlice controller
Browse files Browse the repository at this point in the history
kubelet is responsible for publishing the NodeResourceSlice objects for its
node. The new controller deletes any extra objects that don't match some
registered driver, updates existing ones, and creates new ones as needed to
ensure that the information provided by drivers is replicated correctly in the
cluster.
  • Loading branch information
pohly committed Feb 26, 2024
1 parent 2117475 commit 16b2228
Show file tree
Hide file tree
Showing 12 changed files with 754 additions and 132 deletions.
1 change: 1 addition & 0 deletions cmd/kubelet/app/server.go
Expand Up @@ -842,6 +842,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
s.FailSwapOn,
kubeDeps.Recorder,
kubeDeps.KubeClient,
nodeName,
)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/container_manager_linux.go
Expand Up @@ -198,7 +198,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface, nodeName types.NodeName) (ContainerManager, error) {
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
Expand Down Expand Up @@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir)
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/dra/manager.go
Expand Up @@ -45,7 +45,7 @@ type ManagerImpl struct {
}

// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) {
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")

claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/cm/dra/manager_test.go
Expand Up @@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory)
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker")
if test.wantErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
manager, err := NewManagerImpl(kubeClient, t.TempDir())
manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker")
assert.NoError(t, err)

if test.claimInfo != nil {
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()

plg := plugin.NewRegistrationHandler()
plg := plugin.NewRegistrationHandler(nil, "worker")
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()

plg := plugin.NewRegistrationHandler()
plg := plugin.NewRegistrationHandler(nil, "worker")
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/kubelet/cm/dra/plugin/client_test.go
Expand Up @@ -43,6 +43,12 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *
return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
}

func (f *fakeV1alpha3GRPCServer) NodeResources(req *drapbv1alpha3.NodeResourcesRequest, srv drapbv1alpha3.Node_NodeResourcesServer) error {
srv.Send(&drapbv1alpha3.NodeResourcesResponse{})
srv.Send(&drapbv1alpha3.NodeResourcesResponse{})
return nil
}

type fakeV1alpha2GRPCServer struct {
drapbv1alpha2.UnimplementedNodeServer
}
Expand Down Expand Up @@ -288,3 +294,82 @@ func TestNodeUnprepareResource(t *testing.T) {
})
}
}

func TestNodeResources(t *testing.T) {
for _, test := range []struct {
description string
serverSetup func(string) (string, tearDown, error)
serverVersion string
request *drapbv1alpha3.NodeResourcesRequest
responses []*drapbv1alpha3.NodeResourcesResponse
expectError string
}{
{
description: "server supports NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha3Version,
request: &drapbv1alpha3.NodeResourcesRequest{},
responses: []*drapbv1alpha3.NodeResourcesResponse{
{},
{},
},
expectError: "EOF",
},
{
description: "server doesn't support NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha2Version,
request: new(drapbv1alpha3.NodeResourcesRequest),
expectError: "Unimplemented",
},
} {
t.Run(test.description, func(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
if err != nil {
t.Fatal(err)
}
defer teardown()

p := &plugin{
endpoint: addr,
version: v1alpha3Version,
}

conn, err := p.getOrCreateGRPCConn()
defer func() {
err := conn.Close()
if err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}

draPlugins.add("dummy-plugin", p)
defer draPlugins.delete("dummy-plugin")

client, err := NewDRAPluginClient("dummy-plugin")
if err != nil {
t.Fatal(err)
}

stream, err := client.NodeResources(context.Background(), test.request)
if err != nil {
t.Fatal(err)
}
var actualResponses []*drapbv1alpha3.NodeResourcesResponse
var actualErr error
for {
resp, err := stream.Recv()
if err != nil {
actualErr = err
break
}
actualResponses = append(actualResponses, resp)
}
assert.Equal(t, test.responses, actualResponses)
assert.Contains(t, actualErr.Error(), test.expectError)
})
}
}

0 comments on commit 16b2228

Please sign in to comment.