Skip to content

Commit

Permalink
dra kubelet: introduce streaming API for retrieving resources
Browse files Browse the repository at this point in the history
This is backwards compatible with old DRA driver kubelet plugins, their gRPC
server will return "not implemented" and that can be handled by
kubelet. Therefore no API break is needed.

However, DRA drivers need to be updated because the Go API changed. They can
return
    status.New(codes.Unimplemented, "no node resource support").Err()
if they don't support the new NodeResources method and structured parameters.
  • Loading branch information
pohly committed Feb 26, 2024
1 parent e060349 commit 2117475
Show file tree
Hide file tree
Showing 10 changed files with 566 additions and 52 deletions.
17 changes: 17 additions & 0 deletions pkg/kubelet/cm/dra/plugin/client.go
Expand Up @@ -198,3 +198,20 @@ func (p *plugin) NodeUnprepareResources(
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
return response, err
}

func (p *plugin) NodeResources(
ctx context.Context,
req *drapb.NodeResourcesRequest,
opts ...grpc.CallOption,
) (drapb.Node_NodeResourcesClient, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodeResources rpc"), "request", req)

conn, err := p.getOrCreateGRPCConn()
if err != nil {
return nil, err
}

nodeClient := drapb.NewNodeClient(conn)
return nodeClient.NodeResources(ctx, req, opts...)
}
Expand Up @@ -145,6 +145,15 @@ func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
}
}

// GRPCStreamInterceptor is called for each gRPC streaming method call. This option
// may be used more than once and each interceptor will get called.
func GRPCStreamInterceptor(interceptor grpc.StreamServerInterceptor) Option {
return func(o *options) error {
o.streamInterceptors = append(o.streamInterceptors, interceptor)
return nil
}
}

// NodeV1alpha2 explicitly chooses whether the DRA gRPC API v1alpha2
// gets enabled.
func NodeV1alpha2(enabled bool) Option {
Expand All @@ -171,6 +180,7 @@ type options struct {
draAddress string
pluginRegistrationEndpoint endpoint
interceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor

nodeV1alpha2, nodeV1alpha3 bool
}
Expand Down Expand Up @@ -215,7 +225,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e

// Run the node plugin gRPC server first to ensure that it is ready.
implemented := false
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
plugin, err := startGRPCServer(klog.LoggerWithName(o.logger, "dra"), o.grpcVerbosity, o.interceptors, o.streamInterceptors, o.draEndpoint, func(grpcServer *grpc.Server) {
if nodeServer, ok := nodeServer.(drapbv1alpha3.NodeServer); ok && o.nodeV1alpha3 {
o.logger.V(5).Info("registering drapbv1alpha3.NodeServer")
drapbv1alpha3.RegisterNodeServer(grpcServer, nodeServer)
Expand Down Expand Up @@ -246,7 +256,7 @@ func Start(nodeServer interface{}, opts ...Option) (result DRAPlugin, finalErr e
}

// Now make it available to kubelet.
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
registrar, err := startRegistrar(klog.LoggerWithName(o.logger, "registrar"), o.grpcVerbosity, o.interceptors, o.streamInterceptors, o.driverName, o.draAddress, o.pluginRegistrationEndpoint)
if err != nil {
return nil, fmt.Errorf("start registrar: %v", err)
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ type nodeRegistrar struct {
}

// startRegistrar returns a running instance.
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, driverName string, endpoint string, pluginRegistrationEndpoint endpoint) (*nodeRegistrar, error) {
n := &nodeRegistrar{
logger: logger,
registrationServer: registrationServer{
Expand All @@ -40,7 +40,7 @@ func startRegistrar(logger klog.Logger, grpcVerbosity int, interceptors []grpc.U
supportedVersions: []string{"1.0.0"}, // TODO: is this correct?
},
}
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
s, err := startGRPCServer(logger, grpcVerbosity, interceptors, streamInterceptors, pluginRegistrationEndpoint, func(grpcServer *grpc.Server) {
registerapi.RegisterRegistrationServer(grpcServer, n)
})
if err != nil {
Expand Down
Expand Up @@ -54,7 +54,7 @@ type endpoint struct {

// startGRPCServer sets up the GRPC server on a Unix domain socket and spawns a goroutine
// which handles requests for arbitrary services.
func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.UnaryServerInterceptor, streamInterceptors []grpc.StreamServerInterceptor, endpoint endpoint, services ...registerService) (*grpcServer, error) {
s := &grpcServer{
logger: logger,
endpoint: endpoint,
Expand All @@ -80,11 +80,14 @@ func startGRPCServer(logger klog.Logger, grpcVerbosity int, interceptors []grpc.
// shutting down, so we don't need to do that.
var opts []grpc.ServerOption
var finalInterceptors []grpc.UnaryServerInterceptor
var finalStreamInterceptors []grpc.StreamServerInterceptor
if grpcVerbosity >= 0 {
finalInterceptors = append(finalInterceptors, s.interceptor)
}
finalInterceptors = append(finalInterceptors, interceptors...)
finalStreamInterceptors = append(finalStreamInterceptors, streamInterceptors...)
opts = append(opts, grpc.ChainUnaryInterceptor(finalInterceptors...))
opts = append(opts, grpc.ChainStreamInterceptor(finalStreamInterceptors...))
s.server = grpc.NewServer(opts...)
for _, service := range services {
service(s.server)
Expand Down

0 comments on commit 2117475

Please sign in to comment.