Skip to content

Commit

Permalink
Merge pull request #4796 from MichaelEischer/parallel-dump-load
Browse files Browse the repository at this point in the history
dump: Parallelize loading large files
  • Loading branch information
MichaelEischer committed May 14, 2024
2 parents 92221c2 + e184538 commit 7ed560a
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 28 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/pull-4796
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Enhancement: Improve `dump` performance for large files

The `dump` command now retrieves the data chunks for a file in parallel. This
improves the download performance by up to the configured number of parallel
backend connections.

https://github.com/restic/restic/issues/3406
https://github.com/restic/restic/pull/4796
48 changes: 46 additions & 2 deletions internal/bloblru/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ type Cache struct {
c *simplelru.LRU[restic.ID, []byte]

free, size int // Current and max capacity, in bytes.
inProgress map[restic.ID]chan struct{}
}

// New constructs a blob cache that stores at most size bytes worth of blobs.
func New(size int) *Cache {
c := &Cache{
free: size,
size: size,
free: size,
size: size,
inProgress: make(map[restic.ID]chan struct{}),
}

// NewLRU wants us to specify some max. number of entries, else it errors.
Expand Down Expand Up @@ -85,6 +87,48 @@ func (c *Cache) Get(id restic.ID) ([]byte, bool) {
return blob, ok
}

func (c *Cache) GetOrCompute(id restic.ID, compute func() ([]byte, error)) ([]byte, error) {
// check if already cached
blob, ok := c.Get(id)
if ok {
return blob, nil
}

// check for parallel download or start our own
finish := make(chan struct{})
c.mu.Lock()
waitForResult, isDownloading := c.inProgress[id]
if !isDownloading {
c.inProgress[id] = finish

// remove progress channel once finished here
defer func() {
c.mu.Lock()
delete(c.inProgress, id)
c.mu.Unlock()
close(finish)
}()
}
c.mu.Unlock()

if isDownloading {
// wait for result of parallel download
<-waitForResult
blob, ok := c.Get(id)
if ok {
return blob, nil
}
}

// download it
blob, err := compute()
if err == nil {
c.Add(id, blob)
}

return blob, err
}

func (c *Cache) evict(key restic.ID, blob []byte) {
debug.Log("bloblru.Cache: evict %v, %d bytes", key, cap(blob))
c.free += cap(blob) + overhead
Expand Down
67 changes: 67 additions & 0 deletions internal/bloblru/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package bloblru

import (
"context"
"fmt"
"math/rand"
"testing"

"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)

func TestCache(t *testing.T) {
Expand Down Expand Up @@ -52,6 +55,70 @@ func TestCache(t *testing.T) {
rtest.Equals(t, cacheSize, c.free)
}

func TestCacheGetOrCompute(t *testing.T) {
var id1, id2 restic.ID
id1[0] = 1
id2[0] = 2

const (
kiB = 1 << 10
cacheSize = 64*kiB + 3*overhead
)

c := New(cacheSize)

e := fmt.Errorf("broken")
_, err := c.GetOrCompute(id1, func() ([]byte, error) {
return nil, e
})
rtest.Equals(t, e, err, "expected error was not returned")

// fill buffer
data1 := make([]byte, 10*kiB)
blob, err := c.GetOrCompute(id1, func() ([]byte, error) {
return data1, nil
})
rtest.OK(t, err)
rtest.Equals(t, &data1[0], &blob[0], "wrong buffer returend")

// now the buffer should be returned without calling the compute function
blob, err = c.GetOrCompute(id1, func() ([]byte, error) {
return nil, e
})
rtest.OK(t, err)
rtest.Equals(t, &data1[0], &blob[0], "wrong buffer returend")

// check concurrency
wg, _ := errgroup.WithContext(context.TODO())
wait := make(chan struct{})
calls := make(chan struct{}, 10)

// start a bunch of blocking goroutines
for i := 0; i < 10; i++ {
wg.Go(func() error {
buf, err := c.GetOrCompute(id2, func() ([]byte, error) {
// block to ensure that multiple requests are waiting in parallel
<-wait
calls <- struct{}{}
return make([]byte, 42), nil
})
if len(buf) != 42 {
return fmt.Errorf("wrong buffer")
}
return err
})
}

close(wait)
rtest.OK(t, wg.Wait())
close(calls)
count := 0
for range calls {
count++
}
rtest.Equals(t, 1, count, "expected exactly one call of the compute function")
}

func BenchmarkAdd(b *testing.B) {
const (
MiB = 1 << 20
Expand Down
85 changes: 68 additions & 17 deletions internal/dump/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/walker"
"golang.org/x/sync/errgroup"
)

// A Dumper writes trees and files from a repository to a Writer
// in an archive format.
type Dumper struct {
cache *bloblru.Cache
format string
repo restic.BlobLoader
repo restic.Loader
w io.Writer
}

func New(format string, repo restic.BlobLoader, w io.Writer) *Dumper {
func New(format string, repo restic.Loader, w io.Writer) *Dumper {
return &Dumper{
cache: bloblru.New(64 << 20),
format: format,
Expand Down Expand Up @@ -103,27 +104,77 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error {
}

func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
var (
buf []byte
err error
)
for _, id := range node.Content {
blob, ok := d.cache.Get(id)
if !ok {
blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, id, buf)
if err != nil {
return err
type loadTask struct {
id restic.ID
out chan<- []byte
}
type writeTask struct {
data <-chan []byte
}

loaderCh := make(chan loadTask)
// per worker: allows for one blob that gets download + one blob thats queue for writing
writerCh := make(chan writeTask, d.repo.Connections()*2)

wg, ctx := errgroup.WithContext(ctx)

wg.Go(func() error {
defer close(loaderCh)
defer close(writerCh)
for _, id := range node.Content {
// non-blocking blob handover to allow the loader to load the next blob
// while the old one is still written
ch := make(chan []byte, 1)
select {
case loaderCh <- loadTask{id: id, out: ch}:
case <-ctx.Done():
return ctx.Err()
}

buf = d.cache.Add(id, blob) // Reuse evicted buffer.
select {
case writerCh <- writeTask{data: ch}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})

for i := uint(0); i < d.repo.Connections(); i++ {
wg.Go(func() error {
for task := range loaderCh {
blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) {
return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil)
})
if err != nil {
return err
}

select {
case task.out <- blob:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
}

if _, err := w.Write(blob); err != nil {
return errors.Wrap(err, "Write")
wg.Go(func() error {
for result := range writerCh {
select {
case data := <-result.data:
if _, err := w.Write(data); err != nil {
return errors.Wrap(err, "Write")
}
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil
})

return nil
return wg.Wait()
}

// IsDir checks if the given node is a directory.
Expand Down
12 changes: 3 additions & 9 deletions internal/fuse/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,14 @@ func (f *file) Open(_ context.Context, _ *fuse.OpenRequest, _ *fuse.OpenResponse
}

func (f *openFile) getBlobAt(ctx context.Context, i int) (blob []byte, err error) {

blob, ok := f.root.blobCache.Get(f.node.Content[i])
if ok {
return blob, nil
}

blob, err = f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil)
blob, err = f.root.blobCache.GetOrCompute(f.node.Content[i], func() ([]byte, error) {
return f.root.repo.LoadBlob(ctx, restic.DataBlob, f.node.Content[i], nil)
})
if err != nil {
debug.Log("LoadBlob(%v, %v) failed: %v", f.node.Name, f.node.Content[i], err)
return nil, unwrapCtxCanceled(err)
}

f.root.blobCache.Add(f.node.Content[i], blob)

return blob, nil
}

Expand Down

0 comments on commit 7ed560a

Please sign in to comment.