Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cephfs: add support for VolumeCondition in NodeGetVolumeStats operation #4125

Draft
wants to merge 4 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/design/proposals/volume-condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Suport for CSI `VolumeCondition` aka Volume Health Checker

## health-checker API

Under `internal/health-checker` the Manager for health-checking is
implemented. The Manager can start a checking process for a given path, return
the (un)healthy state and stop the checking process when the volume is not
needed anymore.

The Manager is responsible for creating a suitble checker for the requested
path. If the path a s block-device, the BlockChecker should be created. For a
filesystem path (directory), the FileChecker is appropriate.

## CephFS

The health-checker writes to the file `csi-volume-condition.ts` in the root of
the volume. This file containse a JSON formatted timestamp.

A new `data` directory is introduced for newly created volumes. During the
`NodeStageVolume` call the root of the volume is mounted, and the `data`
directory is bind-mounted inside the container when `NodePublishVolume` is
called.

The `data` directory makes it possible to place Ceph-CSI internal files in the
root of the volume, without that the user/application has access to it.
18 changes: 18 additions & 0 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ import (
"google.golang.org/grpc/status"
)

// defaultDataRoot is a directory on the volume that will be mounted inside the
// container during NodePublishVolume. The parent directory od defaultDataRoot
// will be mounted during NodeStageVolume, and is available for Ceph-CSI
// internal consumption,
const defaultDataRoot = "data"

// ControllerServer struct of CEPH CSI driver with supported methods of CSI
// controller server spec.
type ControllerServer struct {
Expand Down Expand Up @@ -365,6 +371,12 @@ func (cs *ControllerServer) CreateVolume(
volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters())
volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath

if volOptions.DataRoot == "" {
volOptions.DataRoot = defaultDataRoot
}
volumeContext["dataRoot"] = volOptions.DataRoot

volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
Expand Down Expand Up @@ -456,6 +468,12 @@ func (cs *ControllerServer) CreateVolume(
volumeContext := k8s.RemoveCSIPrefixedParameters(req.GetParameters())
volumeContext["subvolumeName"] = vID.FsSubvolName
volumeContext["subvolumePath"] = volOptions.RootPath

if volOptions.DataRoot == "" {
volOptions.DataRoot = defaultDataRoot
}
volumeContext["dataRoot"] = volOptions.DataRoot

volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
Expand Down
2 changes: 2 additions & 0 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
}
}

Expand Down
59 changes: 54 additions & 5 deletions internal/cephfs/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
Expand All @@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks
kernelMountOptions string
fuseMountOptions string
healthChecker hc.Manager
}

func getCredentialsForVolume(
Expand Down Expand Up @@ -209,6 +211,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

healthCheckPath := getHealthCheckPath(stagingTargetPath, req.GetVolumeContext())

// Check if the volume is already mounted

if err = ns.tryRestoreFuseMountInNodeStage(ctx, mnt, stagingTargetPath); err != nil {
Expand All @@ -228,6 +232,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error())
}

ns.healthChecker.StartChecker(healthCheckPath)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -270,6 +276,8 @@ func (ns *NodeServer) NodeStageVolume(
}
}

ns.healthChecker.StartChecker(healthCheckPath)

return &csi.NodeStageVolumeResponse{}, nil
}

Expand Down Expand Up @@ -452,6 +460,15 @@ func (ns *NodeServer) NodePublishVolume(
targetPath := req.GetTargetPath()
volID := fsutil.VolumeID(req.GetVolumeId())

// dataPath is the directory that will be bind-mounted into the
// container. If "dataRoot" is empty, the dataPath is the same as the
// stagingTargetPath.
dataPath := stagingTargetPath
dataRoot, ok := req.GetVolumeContext()["dataRoot"]
if ok {
dataPath = path.Join(dataPath, dataRoot)
}

// Considering kubelet make sure the stage and publish operations
// are serialized, we dont need any extra locking in nodePublish

Expand All @@ -464,7 +481,7 @@ func (ns *NodeServer) NodePublishVolume(
if err := ns.tryRestoreFuseMountsInNodePublish(
ctx,
volID,
stagingTargetPath,
dataPath,
targetPath,
req.GetVolumeContext(),
); err != nil {
Expand Down Expand Up @@ -510,15 +527,15 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}
if encrypted {
stagingTargetPath = fscrypt.AppendEncyptedSubdirectory(stagingTargetPath)
if err = fscrypt.IsDirectoryUnlocked(stagingTargetPath, "ceph"); err != nil {
dataPath = fscrypt.AppendEncyptedSubdirectory(dataPath)
if err = fscrypt.IsDirectoryUnlocked(dataPath, "ceph"); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

if err = mounter.BindMount(
ctx,
stagingTargetPath,
dataPath,
targetPath,
req.GetReadonly(),
mountOptions); err != nil {
Expand Down Expand Up @@ -608,6 +625,8 @@ func (ns *NodeServer) NodeUnstageVolume(

stagingTargetPath := req.GetStagingTargetPath()

ns.healthChecker.StopChecker(stagingTargetPath)

if err = fsutil.RemoveNodeStageMountinfo(fsutil.VolumeID(volID)); err != nil {
log.ErrorLog(ctx, "cephfs: failed to remove NodeStageMountinfo for volume %s: %v", volID, err)

Expand Down Expand Up @@ -670,6 +689,13 @@ func (ns *NodeServer) NodeGetCapabilities(
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Expand Down Expand Up @@ -711,8 +737,31 @@ func (ns *NodeServer) NodeGetVolumeStats(
}

if stat.Mode().IsDir() {
return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
res, err := csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath, false)
if err != nil {
return nil, err
}

healthy, msg := ns.healthChecker.IsHealthy(req.GetStagingTargetPath())
res.VolumeCondition = &csi.VolumeCondition{
Abnormal: !healthy,
}

if !healthy {
res.VolumeCondition.Message = msg.Error()
}

return res, nil
}

return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)
}

func getHealthCheckPath(basedir string, volumeContext map[string]string) string{
_, ok := volumeContext["dataRoot"]
if !ok {
return path.Join(basedir, ".meta.csi")
}

return basedir
}
10 changes: 10 additions & 0 deletions internal/cephfs/store/volumeoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ type VolumeOptions struct {

ProvisionVolume bool `json:"provisionVolume"`
BackingSnapshot bool `json:"backingSnapshot"`

// DataRoot is set to the directory that is bind-mounted into the
// container. The parent directory of the DataRoot is not available for
// the end-user, but Ceph-CSI can use it for storing state, doing
// health-checks and the like.
DataRoot string `json:dataRoot`
}

// Connect a CephFS volume to the Ceph cluster.
Expand Down Expand Up @@ -266,6 +272,10 @@ func NewVolumeOptions(
return nil, err
}

if err = extractOptionalOption(&opts.DataRoot, "dataRoot", volOptions); err != nil {
return nil, err
}

if err = opts.InitKMS(ctx, volOptions, req.GetSecrets()); err != nil {
return nil, fmt.Errorf("failed to init KMS: %w", err)
}
Expand Down