Skip to content

Commit

Permalink
command: use external sort for comparison in sync (#483)
Browse files Browse the repository at this point in the history
It uses external sort instead of in-memory sort to dramatically reduce memory usage in the expense of the speed.
It uses encoding/gob format to store to disk and restore from there.

Fixes #441
Fixes #447
  • Loading branch information
kucukaslan committed Jun 16, 2023
1 parent 3e08061 commit f0ce87d
Show file tree
Hide file tree
Showing 27 changed files with 2,318 additions and 104 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,11 @@
- Allow adjacent slashes to be used as keys when uploading to remote. ([#459](https://github.com/peak/s5cmd/pull/459))
- Debian packages are provided on [releases page](https://github.com/peak/s5cmd/releases) ([#380](https://github.com/peak/s5cmd/issues/380))
- Upgraded minimum required Go version to 1.17.
- The sync command uses `external sort` instead of `internal` sort. This change
reduces RAM usage from ~10 GB to ~1.5 GB for `sync` operation of a directory containing
1,000,000 files at a cost of speed (20% slower for 1,000,000 objects). For smaller
directories (~50,000 files) there is no significant change in speed. ([#483](https://github.com/peak/s5cmd/pull/483))

- Improve auto-completion support of s5cmd for `zsh` and `bash`, start supporting `pwsh` and stop the support for `fish`. Now s5cmd can complete bucket names, s3 keys in a bucket and the local files. However, `install-completion` flag no longer _installs_ the completion script to `*rc` files instead it merely gives instructions to install autocompletion and provides the autocompletion script ([#500](https://github.com/peak/s5cmd/pull/500)).

#### Bugfixes
Expand Down
280 changes: 183 additions & 97 deletions command/sync.go
Expand Up @@ -6,11 +6,11 @@ import (
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/hashicorp/go-multierror"
"github.com/lanrat/extsort"
"github.com/urfave/cli/v2"

errorpkg "github.com/peak/s5cmd/error"
Expand All @@ -20,6 +20,11 @@ import (
"github.com/peak/s5cmd/storage/url"
)

const (
extsortChannelBufferSize = 1_000
extsortChunkSize = 100_000
)

var syncHelpTemplate = `Name:
{{.HelpName}} - {{.Usage}}
Expand Down Expand Up @@ -216,71 +221,66 @@ func (s Sync) Run(c *cli.Context) error {
return multierror.Append(err, merrorWaiter).ErrorOrNil()
}

// compareObjects compares source and destination objects.
// compareObjects compares source and destination objects. It assumes that
// sourceObjects and destObjects channels are already sorted in ascending order.
// Returns objects those in only source, only destination
// and both.
// The algorithm is taken from;
// https://github.com/rclone/rclone/blob/HEAD/fs/march/march.go#L304
func compareObjects(sourceObjects, destObjects []*storage.Object) ([]*url.URL, []*url.URL, []*ObjectPair) {
// sort the source and destination objects.
sort.SliceStable(sourceObjects, func(i, j int) bool {
return sourceObjects[i].URL.Relative() < sourceObjects[j].URL.Relative()
})
sort.SliceStable(destObjects, func(i, j int) bool {
return destObjects[i].URL.Relative() < destObjects[j].URL.Relative()
})

func compareObjects(sourceObjects, destObjects chan *storage.Object) (chan *url.URL, chan *url.URL, chan *ObjectPair) {
var (
srcOnly []*url.URL
dstOnly []*url.URL
commonObj []*ObjectPair
srcOnly = make(chan *url.URL, extsortChannelBufferSize)
dstOnly = make(chan *url.URL, extsortChannelBufferSize)
commonObj = make(chan *ObjectPair, extsortChannelBufferSize)
srcName string
dstName string
)

for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 {
var srcObject, dstObject *storage.Object
var srcName, dstName string

if iSrc < len(sourceObjects) {
srcObject = sourceObjects[iSrc]
srcName = filepath.ToSlash(srcObject.URL.Relative())
}
go func() {
src, srcOk := <-sourceObjects
dst, dstOk := <-destObjects

if iDst < len(destObjects) {
dstObject = destObjects[iDst]
dstName = filepath.ToSlash(dstObject.URL.Relative())
}
defer close(srcOnly)
defer close(dstOnly)
defer close(commonObj)

if srcObject == nil && dstObject == nil {
break
}
for {
if srcOk {
srcName = filepath.ToSlash(src.URL.Relative())
}
if dstOk {
dstName = filepath.ToSlash(dst.URL.Relative())
}

if srcObject != nil && dstObject != nil {
if srcName > dstName {
srcObject = nil
iSrc--
} else if srcName == dstName { // if there is a match.
commonObj = append(commonObj, &ObjectPair{src: srcObject, dst: dstObject})
} else {
dstObject = nil
iDst--
if srcOk && dstOk {
if srcName < dstName {
srcOnly <- src.URL
src, srcOk = <-sourceObjects
} else if srcName == dstName { // if there is a match.
commonObj <- &ObjectPair{src: src, dst: dst}
src, srcOk = <-sourceObjects
dst, dstOk = <-destObjects
} else {
dstOnly <- dst.URL
dst, dstOk = <-destObjects
}
} else if srcOk {
srcOnly <- src.URL
src, srcOk = <-sourceObjects
} else if dstOk {
dstOnly <- dst.URL
dst, dstOk = <-destObjects
} else /* if !srcOK && !dstOk */ {
break
}
}
}()

switch {
case srcObject == nil && dstObject == nil:
// do nothing
case srcObject == nil:
dstOnly = append(dstOnly, dstObject.URL)
case dstObject == nil:
srcOnly = append(srcOnly, srcObject.URL)
}
}
return srcOnly, dstOnly, commonObj
}

// getSourceAndDestinationObjects returns source and destination
// objects from given urls.
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) ([]*storage.Object, []*storage.Object, error) {
// getSourceAndDestinationObjects returns source and destination objects from
// given URLs. The returned channels gives objects sorted in ascending order
// with respect to their url.Relative path. See also storage.Less.
func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl *url.URL) (chan *storage.Object, chan *storage.Object, error) {
sourceClient, err := storage.NewClient(ctx, srcurl, s.storageOpts)
if err != nil {
return nil, nil, err
Expand All @@ -305,46 +305,105 @@ func (s Sync) getSourceAndDestinationObjects(ctx context.Context, srcurl, dsturl
}

var (
sourceObjects []*storage.Object
destObjects []*storage.Object
wg sync.WaitGroup
sourceObjects = make(chan *storage.Object, extsortChannelBufferSize)
destObjects = make(chan *storage.Object, extsortChannelBufferSize)
)

extsortDefaultConfig := extsort.DefaultConfig()
extsortConfig := &extsort.Config{
ChunkSize: extsortChunkSize,
NumWorkers: extsortDefaultConfig.NumWorkers,
ChanBuffSize: extsortChannelBufferSize,
SortedChanBuffSize: extsortChannelBufferSize,
}
extsortDefaultConfig = nil

// get source objects.
wg.Add(1)
go func() {
defer wg.Done()
srcObjectChannel := sourceClient.List(ctx, srcurl, s.followSymlinks)
for srcObject := range srcObjectChannel {
if s.shouldSkipObject(srcObject, true) {
continue
defer close(sourceObjects)
unfilteredSrcObjectChannel := sourceClient.List(ctx, srcurl, s.followSymlinks)
filteredSrcObjectChannel := make(chan extsort.SortType, extsortChannelBufferSize)

go func() {
defer close(filteredSrcObjectChannel)
// filter and redirect objects
for st := range unfilteredSrcObjectChannel {
if s.shouldSkipObject(st, true) {
continue
}
filteredSrcObjectChannel <- *st
}
sourceObjects = append(sourceObjects, srcObject)
}()

var (
sorter *extsort.SortTypeSorter
srcOutputChan chan extsort.SortType
)

sorter, srcOutputChan, srcErrCh := extsort.New(filteredSrcObjectChannel, storage.FromBytes, storage.Less, extsortConfig)
sorter.Sort(ctx)

for srcObject := range srcOutputChan {
o := srcObject.(storage.Object)
sourceObjects <- &o
}

// read and print the external sort errors
go func() {
for err := range srcErrCh {
printError(s.fullCommand, s.op, err)
}
}()
}()

// get destination objects.
wg.Add(1)
go func() {
defer wg.Done()
destObjectsChannel := destClient.List(ctx, destObjectsURL, false)
for destObject := range destObjectsChannel {
if s.shouldSkipObject(destObject, false) {
continue
defer close(destObjects)
unfilteredDestObjectsChannel := destClient.List(ctx, destObjectsURL, false)
filteredDstObjectChannel := make(chan extsort.SortType, extsortChannelBufferSize)

go func() {
defer close(filteredDstObjectChannel)

// filter and redirect objects
for dt := range unfilteredDestObjectsChannel {
if s.shouldSkipObject(dt, false) {
continue
}
filteredDstObjectChannel <- *dt
}
destObjects = append(destObjects, destObject)
}()

var (
dstSorter *extsort.SortTypeSorter
dstOutputChan chan extsort.SortType
)

dstSorter, dstOutputChan, dstErrCh := extsort.New(filteredDstObjectChannel, storage.FromBytes, storage.Less, extsortConfig)
dstSorter.Sort(ctx)

for destObject := range dstOutputChan {
o := destObject.(storage.Object)
destObjects <- &o
}

// read and print the external sort errors
go func() {
for err := range dstErrCh {
printError(s.fullCommand, s.op, err)
}
}()

}()

wg.Wait()
return sourceObjects, destObjects, nil
}

// planRun prepares the commands and writes them to writer 'w'.
func (s Sync) planRun(
c *cli.Context,
onlySource, onlyDest []*url.URL,
common []*ObjectPair,
onlySource, onlyDest chan *url.URL,
common chan *ObjectPair,
dsturl *url.URL,
strategy SyncStrategy,
w io.WriteCloser,
Expand All @@ -359,44 +418,71 @@ func (s Sync) planRun(
"raw": true,
}

// it should wait until both of the child goroutines for onlySource and common channels
// are completed before closing the WriteCloser w to ensure that all URLs are processed.
var wg sync.WaitGroup

// only in source
for _, srcurl := range onlySource {
curDestURL := generateDestinationURL(srcurl, dsturl, isBatch)
command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL)
if err != nil {
printDebug(s.op, err, srcurl, curDestURL)
continue
wg.Add(1)
go func() {
defer wg.Done()
for srcurl := range onlySource {
curDestURL := generateDestinationURL(srcurl, dsturl, isBatch)
command, err := generateCommand(c, "cp", defaultFlags, srcurl, curDestURL)
if err != nil {
printDebug(s.op, err, srcurl, curDestURL)
continue
}
fmt.Fprintln(w, command)
}
fmt.Fprintln(w, command)
}
}()

// both in source and destination
for _, commonObject := range common {
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
for commonObject := range common {
sourceObject, destObject := commonObject.src, commonObject.dst
curSourceURL, curDestURL := sourceObject.URL, destObject.URL
err := strategy.ShouldSync(sourceObject, destObject) // check if object should be copied.
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}

command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
command, err := generateCommand(c, "cp", defaultFlags, curSourceURL, curDestURL)
if err != nil {
printDebug(s.op, err, curSourceURL, curDestURL)
continue
}
fmt.Fprintln(w, command)
}
fmt.Fprintln(w, command)
}
}()

// only in destination
if s.delete && len(onlyDest) > 0 {
command, err := generateCommand(c, "rm", defaultFlags, onlyDest...)
if s.delete {
// unfortunately we need to read them all!
// or rewrite generateCommand function?
dstURLs := make([]*url.URL, 0, extsortChunkSize)

for d := range onlyDest {
dstURLs = append(dstURLs, d)
}
command, err := generateCommand(c, "rm", defaultFlags, dstURLs...)
if err != nil {
printDebug(s.op, err, onlyDest...)
printDebug(s.op, err, dstURLs...)
return
}
fmt.Fprintln(w, command)
} else {
// we only need to consume them from the channel so that rest of the objects
// can be sent to channel.
for d := range onlyDest {
_ = d
}
}

wg.Wait()
}

// generateDestinationURL generates destination url for given
Expand Down
5 changes: 3 additions & 2 deletions e2e/sync_test.go
Expand Up @@ -15,9 +15,7 @@ import (
// sync -n s3://bucket/object file
func TestSyncFailForNonsharedFlagsFromCopyCommand(t *testing.T) {
t.Parallel()

s3client, s5cmd := setup(t)

const (
filename = "source.go"
)
Expand Down Expand Up @@ -222,6 +220,9 @@ func TestSyncLocalFolderToS3EmptyBucket(t *testing.T) {
3: equals(`cp %vtestfile.txt %vtestfile.txt`, src, dst),
}, sortInput(true))

// there should be no error, since "no object found" error for destination is ignored
assertLines(t, result.Stderr(), map[int]compareFunc{})

// assert local filesystem
expected := fs.Expected(t, folderLayout...)
assert.Assert(t, fs.Equal(workdir.Path(), expected))
Expand Down

0 comments on commit f0ce87d

Please sign in to comment.