Skip to content

Commit

Permalink
Merge pull request #4605 from MichaelEischer/better-restorer-error-ha…
Browse files Browse the repository at this point in the history
…ndling

Rework repository.StreamPacks & better restorer error handling
  • Loading branch information
MichaelEischer committed May 1, 2024
2 parents 1e57057 + 676f0dc commit ffe5439
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 76 deletions.
8 changes: 8 additions & 0 deletions changelog/unreleased/issue-4627
@@ -0,0 +1,8 @@
Enhancement: Improve reliability of backend operations

Restic now downloads pack files in large chunks instead of using a streaming
download. This prevents failures due to interrupted streams. The `restore`
command now also retries downloading individual blobs that cannot be retrieved.

https://github.com/restic/restic/issues/4627
https://github.com/restic/restic/pull/4605
36 changes: 32 additions & 4 deletions internal/checker/checker.go
Expand Up @@ -567,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)

it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
Expand Down Expand Up @@ -653,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil
}

type bufReader struct {
rd *bufio.Reader
buf []byte
}

func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}

func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}

func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]

_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}

// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}

const maxStreamBufferSize = 4 * 1024 * 1024

// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
Expand All @@ -675,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
Expand Down
9 changes: 2 additions & 7 deletions internal/repository/repack.go
Expand Up @@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
// a required blob couldn't be retrieved
return err
}

keepMutex.Lock()
Expand Down
169 changes: 112 additions & 57 deletions internal/repository/repository.go
@@ -1,7 +1,6 @@
package repository

import (
"bufio"
"bytes"
"context"
"fmt"
Expand All @@ -12,7 +11,6 @@ import (
"sort"
"sync"

"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd"
"github.com/restic/chunker"
"github.com/restic/restic/internal/backend"
Expand All @@ -29,8 +27,6 @@ import (
"golang.org/x/sync/errgroup"
)

const MaxStreamBufferSize = 4 * 1024 * 1024

const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024
Expand Down Expand Up @@ -966,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
}

type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)

// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// Skip sections with more than 1MB unused blobs
const maxUnusedRange = 1 * 1024 * 1024

// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it.
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
}

func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
Expand All @@ -990,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack

lowerIdx := 0
lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize

for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID)
}

chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}

if split {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
Expand All @@ -1006,68 +1019,119 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
}

func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}

dataStart := blobs[0].Offset
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length

debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))

dec, err := zstd.NewReader(nil)
data := make([]byte, int(dataEnd-dataStart))
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
_, cerr := io.ReadFull(rd, data)
return cerr
})
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
panic(dec)
// the context is only still valid if handleBlobFn never returned an error
if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}
return errors.Wrap(err, "StreamPack")
}
defer dec.Close()

ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)

for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)

for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}

err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return nil
})

err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}

return errors.Wrap(err, "StreamPack")
}

// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}

type byteReader struct {
buf []byte
}

func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}

func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}

func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}

type PackBlobIterator struct {
packID restic.ID
rd *bufio.Reader
rd discardReader
currentOffset uint

blobs []restic.Blob
key *crypto.Key
dec *zstd.Decoder

buf []byte
decode []byte
}

Expand All @@ -1079,7 +1143,7 @@ type PackBlobValue struct {

var ErrPackEOF = errors.New("reached EOF of pack file")

func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint,
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{
packID: packID,
Expand Down Expand Up @@ -1114,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)

if uint(cap(b.buf)) < entry.Length {
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]

n, err := io.ReadFull(b.rd, b.buf)
buf, err := b.rd.ReadFull(int(entry.Length))
if err != nil {
debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
}

if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length

if int(entry.Length) <= b.key.NonceSize() {
Expand All @@ -1137,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
}

// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():]
nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)
Expand Down

0 comments on commit ffe5439

Please sign in to comment.