Skip to content

Commit

Permalink
Fix path joining in and test --delete-destination (#2620)
Browse files Browse the repository at this point in the history
* Automatically create the hash data directory when using an alternative

* Fix path joining in and test --delete-destination
  • Loading branch information
adreed-msft committed May 3, 2024
1 parent bc518c0 commit 8c6cc61
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 53 deletions.
96 changes: 48 additions & 48 deletions cmd/syncProcessor.go
Expand Up @@ -247,7 +247,7 @@ func newSyncDeleteProcessor(cca *cookedSyncCmdArgs, fpo common.FolderPropertyOpt
if err != nil {
return nil, err
}

return newInteractiveDeleteProcessor(deleter.delete, cca.deleteDestination, cca.fromTo.To().String(), cca.destination, cca.incrementDeletionCount, cca.dryrunMode), nil
}

Expand Down Expand Up @@ -284,7 +284,7 @@ func (b *remoteResourceDeleter) getObjectURL(objectURL string) (*url.URL, error)
if err != nil {
return nil, err
}
return u,nil
return u, nil
}

func (b *remoteResourceDeleter) delete(object StoredObject) error {
Expand All @@ -305,12 +305,12 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error {

var err error
var objURL *url.URL

switch b.targetLocation {
case common.ELocation.Blob():
bsc, _ := sc.BlobServiceClient()
var blobClient *blob.Client = bsc.NewContainerClient(b.containerName).NewBlobClient(path.Join(b.rootPath + object.relativePath))
var blobClient *blob.Client = bsc.NewContainerClient(b.containerName).NewBlobClient(path.Join(b.rootPath, object.relativePath))

objURL, err = b.getObjectURL(blobClient.URL())
if err != nil {
break
Expand All @@ -321,7 +321,7 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error {
_, err = blobClient.Delete(b.ctx, nil)
case common.ELocation.File():
fsc, _ := sc.FileServiceClient()
fileClient := fsc.NewShareClient(b.containerName).NewRootDirectoryClient().NewFileClient(path.Join(b.rootPath + object.relativePath))
fileClient := fsc.NewShareClient(b.containerName).NewRootDirectoryClient().NewFileClient(path.Join(b.rootPath, object.relativePath))

objURL, err = b.getObjectURL(fileClient.URL())
if err != nil {
Expand All @@ -330,13 +330,13 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error {
b.folderManager.RecordChildExists(objURL)
defer b.folderManager.RecordChildDeleted(objURL)

err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func()(interface{}, error) {
err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func() (interface{}, error) {
return fileClient.Delete(b.ctx, nil)
}, fileClient, b.forceIfReadOnly)
case common.ELocation.BlobFS():
dsc, _ := sc.DatalakeServiceClient()
fileClient := dsc.NewFileSystemClient(b.containerName).NewFileClient(path.Join(b.rootPath + object.relativePath))
fileClient := dsc.NewFileSystemClient(b.containerName).NewFileClient(path.Join(b.rootPath, object.relativePath))

objURL, err = b.getObjectURL(fileClient.DFSURL())
if err != nil {
break
Expand Down Expand Up @@ -369,48 +369,48 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error {
var objURL *url.URL
var err error
switch b.targetLocation {
case common.ELocation.Blob():
bsc, _ := sc.BlobServiceClient()
blobClient := bsc.NewContainerClient(b.containerName).NewBlobClient(path.Join(b.rootPath + object.relativePath))
// HNS endpoint doesn't like delete snapshots on a directory
objURL, err = b.getObjectURL(blobClient.URL())
if err != nil {
return err
}
case common.ELocation.Blob():
bsc, _ := sc.BlobServiceClient()
blobClient := bsc.NewContainerClient(b.containerName).NewBlobClient(path.Join(b.rootPath, object.relativePath))
// HNS endpoint doesn't like delete snapshots on a directory
objURL, err = b.getObjectURL(blobClient.URL())
if err != nil {
return err
}

deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
_, err = blobClient.Delete(b.ctx, nil)
return (err == nil)
}
case common.ELocation.File():
fsc, _ := sc.FileServiceClient()
dirClient := fsc.NewShareClient(b.containerName).NewDirectoryClient(path.Join(b.rootPath + object.relativePath))
objURL, err = b.getObjectURL(dirClient.URL())
if err != nil {
return err
}
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
_, err = blobClient.Delete(b.ctx, nil)
return (err == nil)
}
case common.ELocation.File():
fsc, _ := sc.FileServiceClient()
dirClient := fsc.NewShareClient(b.containerName).NewDirectoryClient(path.Join(b.rootPath, object.relativePath))
objURL, err = b.getObjectURL(dirClient.URL())
if err != nil {
return err
}

deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func()(interface{}, error) {
return dirClient.Delete(b.ctx, nil)
}, dirClient, b.forceIfReadOnly)
return (err == nil)
}
case common.ELocation.BlobFS():
dsc, _ := sc.DatalakeServiceClient()
directoryClient := dsc.NewFileSystemClient(b.containerName).NewDirectoryClient(path.Join(b.rootPath + object.relativePath))
objURL, err = b.getObjectURL(directoryClient.DFSURL())
if err != nil {
return err
}
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
err = common.DoWithOverrideReadOnlyOnAzureFiles(b.ctx, func() (interface{}, error) {
return dirClient.Delete(b.ctx, nil)
}, dirClient, b.forceIfReadOnly)
return (err == nil)
}
case common.ELocation.BlobFS():
dsc, _ := sc.DatalakeServiceClient()
directoryClient := dsc.NewFileSystemClient(b.containerName).NewDirectoryClient(path.Join(b.rootPath, object.relativePath))
objURL, err = b.getObjectURL(directoryClient.DFSURL())
if err != nil {
return err
}

deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
recursiveContext := common.WithRecursive(b.ctx, false)
_, err = directoryClient.Delete(recursiveContext, nil)
return (err == nil)
}
default:
panic("not implemented, check your code")
deleteFunc = func(ctx context.Context, logger common.ILogger) bool {
recursiveContext := common.WithRecursive(b.ctx, false)
_, err = directoryClient.Delete(recursiveContext, nil)
return (err == nil)
}
default:
panic("not implemented, check your code")
}

b.folderManager.RecordChildExists(objURL)
Expand Down
2 changes: 1 addition & 1 deletion common/hash_data.go
Expand Up @@ -61,7 +61,7 @@ func (mode *HashStorageMode) Parse(s string) error {

// NewHashDataAdapter is a function that creates a new HiddenFileDataAdapter on systems that do not override the default functionality.
var NewHashDataAdapter = func(hashPath, dataPath string, mode HashStorageMode) (HashDataAdapter, error) {
return &HiddenFileDataAdapter{hashPath, dataPath}, nil
return &HiddenFileDataAdapter{hashPath, dataPath, &sync.Once{}}, nil
}

// HashDataAdapter implements an interface to pull and set hash data on files based upon a relative path
Expand Down
3 changes: 3 additions & 0 deletions common/hash_data_adapter_hidden_files.go
Expand Up @@ -6,11 +6,14 @@ import (
"io"
"os"
"path/filepath"
"sync"
)

type HiddenFileDataAdapter struct {
hashBasePath string // "" == dataBasePath
dataBasePath string

createHashDirOnce *sync.Once
}

func (a *HiddenFileDataAdapter) GetMode() HashStorageMode {
Expand Down
17 changes: 16 additions & 1 deletion e2etest/newe2e_resource_managers_blobfs.go
Expand Up @@ -335,7 +335,7 @@ func (b *BlobFSPathResourceProvider) CreateParents(a Asserter) {

dir, _ := path.Split(b.objectPath)
if dir != "" {
obj := b.Container.GetObject(a, strings.TrimSuffix(dir, "/"), common.EEntityType.Folder()).(*FileObjectResourceManager)
obj := b.Container.GetObject(a, strings.TrimSuffix(dir, "/"), common.EEntityType.Folder()).(*BlobFSPathResourceProvider)
// Create recursively calls this function.
if !obj.Exists() {
obj.Create(a, nil, ObjectProperties{})
Expand Down Expand Up @@ -388,6 +388,14 @@ func (b *BlobFSPathResourceProvider) Create(a Asserter, body ObjectContentContai
}

meta[common.POSIXSymlinkMeta] = pointerTo("true")
} else if b.entityType == common.EEntityType.Folder() {
meta = make(common.Metadata)

for k, v := range properties.Metadata {
meta[k] = v
}

meta[common.POSIXFolderMeta] = pointerTo("true")
}
b.SetMetadata(a, meta)

Expand Down Expand Up @@ -477,6 +485,13 @@ func (b *BlobFSPathResourceProvider) SetHTTPHeaders(a Asserter, h contentHeaders

func (b *BlobFSPathResourceProvider) SetMetadata(a Asserter, metadata common.Metadata) {
_, err := b.getFileClient().SetMetadata(ctx, metadata, nil)

if datalakeerror.HasCode(err, datalakeerror.UnsupportedHeader) {
// retry, removing hdi_isfolder
delete(metadata, common.POSIXFolderMeta)
_, err = b.getFileClient().SetMetadata(ctx, metadata, nil)
}

a.NoError("Set metadata", err)
}

Expand Down
4 changes: 2 additions & 2 deletions e2etest/newe2e_resource_managers_file.go
Expand Up @@ -438,9 +438,9 @@ func (f *FileObjectResourceManager) CreateParents(a Asserter) {
f.Share.Create(a, ContainerProperties{})
}

dir, _ := path.Split(f.path)
dir, _ := path.Split(strings.TrimSuffix(f.path, "/"))
if dir != "" {
obj := f.Share.GetObject(a, strings.TrimSuffix(dir, "/"), common.EEntityType.Folder()).(*FileObjectResourceManager)
obj := f.Share.GetObject(a, dir, common.EEntityType.Folder()).(*FileObjectResourceManager)
// Create recursively calls this function.
if !obj.Exists() {
obj.Create(a, nil, ObjectProperties{})
Expand Down
2 changes: 1 addition & 1 deletion e2etest/newe2e_task_resourcemanagement.go
Expand Up @@ -135,7 +135,7 @@ func ValidateResource[T ResourceManager](a Asserter, target T, definition Matche
objDef := definition.(ResourceDefinitionObject)

if !objDef.ShouldExist() {
a.AssertNow("object must not exist", Equal{}, objMan.Exists(), false)
a.Assert(fmt.Sprintf("object %s must not exist", objMan.ObjectName()), Equal{}, objMan.Exists(), false)
return
}

Expand Down
40 changes: 40 additions & 0 deletions e2etest/zt_newe2e_sync_test.go
Expand Up @@ -130,3 +130,43 @@ func (s *SyncTestSuite) Scenario_TestSyncHashStorageModes(a *ScenarioVariationMa

a.Assert("hashes must match", Equal{}, data.Data, base64.StdEncoding.EncodeToString(md5[:]))
}

func (s *SyncTestSuite) Scenario_TestSyncRemoveDestination(svm *ScenarioVariationManager) {
srcLoc := ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob(), common.ELocation.File(), common.ELocation.BlobFS()})
dstLoc := ResolveVariation(svm, []common.Location{common.ELocation.Local(), common.ELocation.Blob(), common.ELocation.File(), common.ELocation.BlobFS()})

if srcLoc == common.ELocation.Local() && srcLoc == dstLoc {
svm.InvalidateScenario()
return
}

srcRes := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, srcLoc, GetResourceOptions{
PreferredAccount: common.Iff(srcLoc == common.ELocation.BlobFS(), pointerTo(PrimaryHNSAcct), nil),
}), ResourceDefinitionContainer{})
dstRes := CreateResource[ContainerResourceManager](svm, GetRootResource(svm, dstLoc, GetResourceOptions{
PreferredAccount: common.Iff(dstLoc == common.ELocation.BlobFS(), pointerTo(PrimaryHNSAcct), nil),
}), ResourceDefinitionContainer{
Objects: ObjectResourceMappingFlat{
"deleteme.txt": ResourceDefinitionObject{Body: NewRandomObjectContentContainer(svm, 512)},
"also/deleteme.txt": ResourceDefinitionObject{Body: NewRandomObjectContentContainer(svm, 512)},
},
})

RunAzCopy(svm, AzCopyCommand{
Verb: AzCopyVerbSync,
Targets: []ResourceManager{srcRes, dstRes},
Flags: SyncFlags{
CopySyncCommonFlags: CopySyncCommonFlags{
Recursive: pointerTo(true),
},
DeleteDestination: pointerTo(true),
},
})

ValidateResource[ContainerResourceManager](svm, dstRes, ResourceDefinitionContainer{
Objects: ObjectResourceMappingFlat{
"deleteme.txt": ResourceDefinitionObject{ObjectShouldExist: pointerTo(false)},
"also/deleteme.txt": ResourceDefinitionObject{ObjectShouldExist: pointerTo(false)},
},
}, false)
}

0 comments on commit 8c6cc61

Please sign in to comment.