Skip to content

Commit

Permalink
repository: Add blob loading fallback to LoadBlobsFromPack
Browse files Browse the repository at this point in the history
Try to retrieve individual blobs via LoadBlob if streaming did not work.
  • Loading branch information
MichaelEischer committed Jan 19, 2024
1 parent 62111f4 commit 3cacc80
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 21 deletions.
9 changes: 2 additions & 7 deletions internal/repository/repack.go
Expand Up @@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
// a required blob couldn't be retrieved
return err
}

keepMutex.Lock()
Expand Down
35 changes: 30 additions & 5 deletions internal/repository/repository.go
Expand Up @@ -876,6 +876,7 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
}

type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)

// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
Expand All @@ -885,10 +886,10 @@ const maxUnusedRange = 4 * 1024 * 1024
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn)
}

func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
Expand All @@ -907,7 +908,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
}
if blobs[i].Offset-lastPos > maxUnusedRange {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
Expand All @@ -916,10 +917,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
return streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:], handleBlobFn)
}

func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}

dataStart := blobs[0].Offset
Expand Down Expand Up @@ -955,6 +956,17 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
return err
}

if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
}

err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
Expand All @@ -965,6 +977,19 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
}
return nil
})

// the context is only still valid if handleBlobFn never returned an error
if ctx.Err() == nil && loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}

return errors.Wrap(err, "StreamPack")
}

Expand Down
86 changes: 77 additions & 9 deletions internal/repository/repository_internal_test.go
Expand Up @@ -147,13 +147,7 @@ func TestStreamPack(t *testing.T) {

func testStreamPack(t *testing.T, version uint) {
// always use the same key for deterministic output
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`

var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
key := testKey(t)

blobSizes := []int{
5522811,
Expand Down Expand Up @@ -276,7 +270,7 @@ func testStreamPack(t *testing.T, version uint) {

loadCalls = 0
shortFirstLoad = test.shortFirstLoad
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -339,7 +333,7 @@ func testStreamPack(t *testing.T, version uint) {
return err
}

err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}
Expand All @@ -351,3 +345,77 @@ func testStreamPack(t *testing.T, version uint) {
}
})
}

func testKey(t *testing.T) crypto.Key {
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`

var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
return key
}

func TestStreamPackFallback(t *testing.T) {
test := func(t *testing.T, failLoad bool) {
key := testKey(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

plaintext := rtest.Random(800, 42)
blobID := restic.Hash(plaintext)
blobs := []restic.Blob{
{
Length: uint(crypto.CiphertextLength(len(plaintext))),
Offset: 0,
BlobHandle: restic.BlobHandle{
ID: blobID,
Type: restic.DataBlob,
},
},
}

var loadPack backendLoadFn
if failLoad {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return errors.New("load error")
}
} else {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// just return an empty array to provoke an error
data := make([]byte, length)
return fn(bytes.NewReader(data))
}
}

loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == blobID {
return plaintext, nil
}
return nil, errors.New("unknown blob")
}

blobOK := false
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
rtest.OK(t, err)
rtest.Equals(t, blobID, blob.ID)
rtest.Equals(t, plaintext, buf)
blobOK = true
return err
}

err := streamPack(ctx, loadPack, loadBlob, &key, restic.ID{}, blobs, handleBlob)
rtest.OK(t, err)
rtest.Assert(t, blobOK, "blob failed to load")
}

t.Run("corrupted blob", func(t *testing.T) {
test(t, false)
})

// test fallback for failed pack loading
t.Run("failed load", func(t *testing.T) {
test(t, true)
})
}

0 comments on commit 3cacc80

Please sign in to comment.