Skip to content

Commit

Permalink
Moved backup finishing function from TarBall (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tinsane authored and x4m committed Mar 5, 2019
1 parent 1b6a8cb commit 6202d06
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 99 deletions.
76 changes: 56 additions & 20 deletions internal/backup_push_handler.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package internal

import (
"bytes"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/wal-g/wal-g/internal/tracelog"
"os"
"path/filepath"
"strconv"
"time"
)

type SentinelMarshallingError struct {
error
}

func NewSentinelMarshallingError(sentinelName string, err error) SentinelMarshallingError {
return SentinelMarshallingError{errors.Wrapf(err, "Failed to marshall sentinel file: '%s'", sentinelName)}
}

func (err SentinelMarshallingError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}

// TODO : unit tests
func getDeltaConfig() (maxDeltas int, fromFull bool) {
stepsStr, hasSteps := os.LookupEnv("WALG_DELTA_MAX_STEPS")
Expand Down Expand Up @@ -134,31 +150,51 @@ func HandleBackupPush(archiveDirectory string, uploader *Uploader) {
}

timelineChanged := bundle.checkTimelineChanged(conn)
var currentBackupSentinelDto *BackupSentinelDto

if !timelineChanged {
currentBackupSentinelDto = &BackupSentinelDto{
BackupStartLSN: &backupStartLSN,
IncrementFromLSN: previousBackupSentinelDto.BackupStartLSN,
PgVersion: pgVersion,
}
if previousBackupSentinelDto.BackupStartLSN != nil {
currentBackupSentinelDto.IncrementFrom = &previousBackupName
if previousBackupSentinelDto.isIncremental() {
currentBackupSentinelDto.IncrementFullName = previousBackupSentinelDto.IncrementFullName
} else {
currentBackupSentinelDto.IncrementFullName = &previousBackupName
}
currentBackupSentinelDto.IncrementCount = &incrementCount
}
// Wait for all uploads to finish.
uploader.finish()
if !uploader.Success {
tracelog.ErrorLogger.Fatalf("Uploading failed during '%s' backup.\n", backupName)
}
if timelineChanged {
tracelog.ErrorLogger.Fatalf("Cannot finish backup because of changed timeline.")
}

currentBackupSentinelDto.setFiles(bundle.GetFiles())
currentBackupSentinelDto.BackupFinishLSN = &finishLsn
currentBackupSentinelDto := &BackupSentinelDto{
BackupStartLSN: &backupStartLSN,
IncrementFromLSN: previousBackupSentinelDto.BackupStartLSN,
PgVersion: pgVersion,
}
if previousBackupSentinelDto.BackupStartLSN != nil {
currentBackupSentinelDto.IncrementFrom = &previousBackupName
if previousBackupSentinelDto.isIncremental() {
currentBackupSentinelDto.IncrementFullName = previousBackupSentinelDto.IncrementFullName
} else {
currentBackupSentinelDto.IncrementFullName = &previousBackupName
}
currentBackupSentinelDto.IncrementCount = &incrementCount
}

// Wait for all uploads to finish.
err = bundle.TarBall.Finish(currentBackupSentinelDto)
currentBackupSentinelDto.setFiles(bundle.GetFiles())
currentBackupSentinelDto.BackupFinishLSN = &finishLsn
currentBackupSentinelDto.UserData = GetSentinelUserData()

// If other parts are successful in uploading, upload json file.
err = UploadSentinel(uploader, currentBackupSentinelDto, backupName)
if err != nil {
tracelog.ErrorLogger.Printf("Failed to upload sentinel file for backup: %s", backupName)
tracelog.ErrorLogger.FatalError(err)
}
}

// TODO : unit tests
func UploadSentinel(uploader *Uploader, sentinelDto *BackupSentinelDto, backupName string) error {
sentinelName := backupName + SentinelSuffix

dtoBody, err := json.Marshal(*sentinelDto)
if err != nil {
return NewSentinelMarshallingError(sentinelName, err)
}

return uploader.Upload(sentinelName, bytes.NewReader(dtoBody))
}
5 changes: 0 additions & 5 deletions internal/nop_tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package internal

import (
"archive/tar"
"fmt"
)

// NOPTarBall mocks a tarball. Used for testing purposes.
Expand All @@ -14,10 +13,6 @@ type NOPTarBall struct {

func (tarBall *NOPTarBall) SetUp(crypter Crypter, params ...string) {}
func (tarBall *NOPTarBall) CloseTar() error { return nil }
func (tarBall *NOPTarBall) Finish(sentinelDto *BackupSentinelDto) error {
fmt.Printf("NOP: %d files.\n", tarBall.number)
return nil
}

func (tarBall *NOPTarBall) Size() int64 { return tarBall.size }
func (tarBall *NOPTarBall) AddSize(i int64) { tarBall.size += i }
Expand Down
51 changes: 0 additions & 51 deletions internal/storage_tar_ball.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,12 @@ package internal

import (
"archive/tar"
"bytes"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/wal-g/wal-g/internal/tracelog"
"io"
)

type NoSentinelUploadError struct {
error
}

func NewNoSentinelUploadError() NoSentinelUploadError {
return NoSentinelUploadError{errors.New("Sentinel was not uploaded due to timeline change during backup")}
}

func (err NoSentinelUploadError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}

// StorageTarBall represents a tar file that is
// going to be uploaded to storage.
type StorageTarBall struct {
Expand Down Expand Up @@ -120,40 +106,3 @@ func (tarBall *StorageTarBall) Size() int64 { return tarBall.size }
func (tarBall *StorageTarBall) AddSize(i int64) { tarBall.size += i }

func (tarBall *StorageTarBall) TarWriter() *tar.Writer { return tarBall.tarWriter }

// Finish writes a .json file description and uploads it with the
// the backup name. Finish will wait until all tar file parts
// have been uploaded. The json file will only be uploaded
// if all other parts of the backup are present in storage.
// an alert is given with the corresponding error.
func (tarBall *StorageTarBall) Finish(sentinelDto *BackupSentinelDto) error {
name := tarBall.backupName + SentinelSuffix
uploader := tarBall.uploader

uploader.finish()

var err error
// If other parts are successful in uploading, upload json file.
if uploader.Success && sentinelDto != nil {
sentinelDto.UserData = GetSentinelUserData()
dtoBody, err := json.Marshal(*sentinelDto)
if err != nil {
return err
}

uploadingErr := uploader.Upload(name, bytes.NewReader(dtoBody))
if uploadingErr != nil {
tracelog.ErrorLogger.Printf("upload: could not upload '%s'\n", name)
tracelog.ErrorLogger.Fatalf("StorageTarBall finish: json failed to upload")
}
} else {
tracelog.InfoLogger.Printf("Uploaded %d compressed tar Files.\n", tarBall.partNumber)
tracelog.ErrorLogger.Printf("Sentinel was not uploaded %v", name)
return NewNoSentinelUploadError()
}

if err == nil && uploader.Success {
tracelog.InfoLogger.Printf("Uploaded %d compressed tar Files.\n", tarBall.partNumber)
}
return err
}
1 change: 0 additions & 1 deletion internal/tar_ball.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
type TarBall interface {
SetUp(crypter Crypter, args ...string)
CloseTar() error
Finish(sentinelDto *BackupSentinelDto) error
Size() int64
AddSize(int64)
TarWriter() *tar.Writer
Expand Down
5 changes: 0 additions & 5 deletions test/tar_ball_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,11 @@ func TestS3DependentFunctions(t *testing.T) {
_, err = tarBall.TarWriter().Write(mockData)
assert.Error(t, err)

err = tarBall.Finish(&internal.BackupSentinelDto{})
assert.NoError(t, err)

// Test naming property of SetUp().
bundle.NewTarBall(false)
tarBall = bundle.TarBall
tarBall.SetUp(MockArmedCrypter(), "mockTarball")
tarBall.CloseTar()
err = tarBall.Finish(&internal.BackupSentinelDto{})
assert.NoError(t, err)
}

func TestPackFileTo(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions test/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,11 @@ func TestUploadError(t *testing.T) {
tarBall := maker.Make(true)
tarBall.SetUp(MockArmedCrypter())

tarBall.Finish(&internal.BackupSentinelDto{})
assert.False(t, uploader.Success)

uploader = testtools.NewMockUploader(true, false)

tarBall = maker.Make(true)
tarBall.SetUp(MockArmedCrypter())
tarBall.Finish(&internal.BackupSentinelDto{})
assert.False(t, uploader.Success)
}
5 changes: 0 additions & 5 deletions test/walk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,6 @@ func TestWalk(t *testing.T) {
t.Log(err)
}

err = bundle.TarBall.Finish(&internal.BackupSentinelDto{})
if err != nil {
t.Log(err)
}

// Test that sentinel exists and is handled correctly.
sen := bundle.Sentinel.Info.Name()
assert.Equal(t, internal.PgControl, sen)
Expand Down
10 changes: 0 additions & 10 deletions testtools/tarballs.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ func (tarBall *FileTarBall) CloseTar() error {
return tarBall.writeCloser.Close()
}

// Finish alerts that compression is complete.
func (tarBall *FileTarBall) Finish(sentinelDto *internal.BackupSentinelDto) error {
fmt.Printf("Wrote %d compressed tar files to %s.\n", tarBall.number, tarBall.out)
return nil
}

func (tarBall *FileTarBall) Size() int64 { return tarBall.size }
func (tarBall *FileTarBall) AddSize(i int64) { tarBall.size += i }
func (tarBall *FileTarBall) TarWriter() *tar.Writer { return tarBall.tarWriter }
Expand All @@ -94,10 +88,6 @@ func (tarBall *BufferTarBall) CloseTar() error {
return tarBall.tarWriter.Close()
}

func (tarBall *BufferTarBall) Finish(sentinelDto *internal.BackupSentinelDto) error {
return nil
}

func (tarBall *BufferTarBall) Size() int64 {
return tarBall.size
}
Expand Down

0 comments on commit 6202d06

Please sign in to comment.