Skip to content

Commit

Permalink
Move WAL overwrite prevention under switch and add precaution checks (#…
Browse files Browse the repository at this point in the history
…109)

* Move WAL overwrite under switch and add precaution checks for archive_mode=always and the parallel upload fails

* Add docs about WALG_PREVENAT_WAL_OVERWRITE
  • Loading branch information
x4m committed Aug 10, 2018
1 parent c5e36cb commit 28b6003
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 67 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ To configure how many concurrency streams are reading disk during ```backup-push

* `WALG_SENTINEL_USER_DATA`

This setting allows backup automation tools to add extra information to JSON sentinel file during ```backup-push```.
This setting allows backup automation tools to add extra information to JSON sentinel file during ```backup-push```. This setting can be used e.g. to give user-defined names to backups.

* `WALG_PREVENT_WAL_OVERWRITE`

If this setting is specified, during ```wal-push``` WAL-G will check the existence of WAL before uploading it. If the different file is already archived under the same name, WAL-G will return the non-zero exit code to prevent PostgreSQL from removing WAL.

* `AWS_ENDPOINT`

Expand Down
2 changes: 1 addition & 1 deletion bguploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func haveNoSlots(u *BgUploader) bool {
// Upload one WAL file
func (uploader *BgUploader) Upload(info os.FileInfo) {
walfilename := strings.TrimSuffix(info.Name(), readySuffix)
UploadWALFile(uploader.tarUploader.Clone(), filepath.Join(uploader.dir, walfilename), uploader.pre, uploader.verify)
UploadWALFile(uploader.tarUploader.Clone(), filepath.Join(uploader.dir, walfilename), uploader.pre, uploader.verify, true)

ready := filepath.Join(uploader.dir, archiveStatus, info.Name())
done := filepath.Join(uploader.dir, archiveStatus, walfilename+done)
Expand Down
74 changes: 35 additions & 39 deletions bguploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"github.com/wal-g/wal-g"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"testing"
"time"
"math/rand"
)

// This test has known race condition
Expand Down Expand Up @@ -58,50 +58,44 @@ func TestBackgroundWALUpload(t *testing.T) {

func TestBackgroundNoOverwriteWALUpload(t *testing.T) {
var overwriteDir = "overwritetestdata"
if os.Getenv("NO_OVERWRITE_TEST") == "1" {
_, a := setupArchiveStatus(t, overwriteDir)

addTestDataFile(t, overwriteDir, 0)

// Re-use generated data to test uploading WAL.
tu := walg.NewLz4MockTarUploader()
tu.UploaderApi = &mockS3Uploader{}
pre := &walg.S3Prefix{
Svc: &mockS3Client{
err: false,
notFound: false,
},
Bucket: aws.String("mock bucket"),
Server: aws.String("mock server"),
}
bu := walg.BgUploader{}

// Look for new WALs while doing main upload
bu.Start(a, 16, tu, pre, false)
time.Sleep(time.Second) //time to spin up new uploaders
bu.Stop()
_, a := setupArchiveStatus(t, overwriteDir)

t.Error("did not exit from not overwriting")
// Re-use generated data to test uploading WAL.
tu := walg.NewLz4MockTarUploader()
tu.UploaderApi = &mockS3Uploader{}
pre := &walg.S3Prefix{
Svc: &mockS3Client{
err: false,
notFound: false,
},
Bucket: aws.String("mock bucket"),
Server: aws.String("mock server"),
PreventWalOverwrite: true,
}

// Here we start this test in separate process to verify panic
// We cannot just call it and recovery since panic is handled in async goroutine
// One day we sill replace all panics with error handling, until then this is OK
cmd := exec.Command(os.Args[0], "-test.run=TestBackgroundNoOverwriteWALUpload")
cmd.Env = append(os.Environ(), "NO_OVERWRITE_TEST=1")
err := cmd.Run()
if e, ok := err.(*exec.ExitError); ok && !e.Success() {
bname := "B0"
bd := filepath.Join(overwriteDir, "archive_status", bname+".ready")
_, err := os.Stat(bd)
if os.IsNotExist(err) {
t.Error(bname + ".ready was deleted")
wasPanic := false
defer func() {
if r := recover(); r != nil {
t.Log("Recovered ", r)
wasPanic = true
}
}()
walg.UploadWALFile(tu, a, pre, false, false)
if !wasPanic {
t.Errorf("WAL was overwritten")
}

cleanup(t, overwriteDir)
return
file, err := os.OpenFile(a, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
if err != nil {
t.Error(err)
}
t.Fatalf("process ran with err %v, want exit status 1", err)
file.WriteString("Hello")
file.Close()
walg.UploadWALFile(tu, a, pre, false, false)
// Should not panic
walg.UploadWALFile(tu, a, pre, false, false)

cleanup(t, overwriteDir)
}

func setupArchiveStatus(t *testing.T, dir string) (string, string) {
Expand Down Expand Up @@ -137,6 +131,8 @@ func setupArchiveStatus(t *testing.T, dir string) (string, string) {
if err != nil {
t.Log(err)
}
file.WriteString(strconv.Itoa(rand.Int()))
file.WriteString(strconv.Itoa(rand.Int()))
file.Close()

return testDir, a
Expand Down
81 changes: 59 additions & 22 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/pkg/errors"
"sync"
"bytes"
)

// HandleDelete is invoked to perform wal-g delete
Expand Down Expand Up @@ -544,7 +545,7 @@ func tryDownloadWALFile(pre *S3Prefix, walFullPath string) (archiveReader io.Rea
return
}

func decompressWALFile(archiveReader io.ReadCloser, dstLocation string, decompressor Decompressor) error {
func decompressWALFile(archiveReader io.ReadCloser, file io.WriteCloser, decompressor Decompressor) error {
crypter := OpenPGPCrypter{}
if crypter.IsUsed() {
reader, err := crypter.Decrypt(archiveReader)
Expand All @@ -554,12 +555,7 @@ func decompressWALFile(archiveReader io.ReadCloser, dstLocation string, decompre
archiveReader = ReadCascadeCloser{reader, archiveReader}
}

file, err := os.OpenFile(dstLocation, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
if err != nil {
return err
}

err = decompressor.Decompress(file, archiveReader)
err := decompressor.Decompress(file, archiveReader)
if err != nil {
return err
}
Expand All @@ -576,7 +572,13 @@ func DownloadAndDecompressWALFile(pre *S3Prefix, walFileName string, dstLocation
if !exists {
continue
}
err = decompressWALFile(archiveReader, dstLocation, decompressor)

file, err := os.OpenFile(dstLocation, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
if err != nil {
log.Fatalf("%+v\n", err)
}

err = decompressWALFile(archiveReader, file, decompressor)
if err != nil {
log.Fatalf("%+v\n", err)
}
Expand All @@ -591,24 +593,21 @@ func HandleWALPush(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verif
// Look for new WALs while doing main upload
bgUploader.Start(dirArc, int32(getMaxUploadConcurrency(16)-1), tarUploader, pre, verify)

UploadWALFile(tarUploader, dirArc, pre, verify)
UploadWALFile(tarUploader, dirArc, pre, verify, false)

bgUploader.Stop()
}

// UploadWALFile from FS to the cloud
func UploadWALFile(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verify bool) {
archive := &Archive{
Prefix: pre,
Archive: aws.String(sanitizePath(tarUploader.server + WalPath + filepath.Base(dirArc) + "." + tarUploader.compressor.FileExtension())),
}

exists, err := archive.CheckExistence()
if err != nil {
log.Fatalf("FATAL %+v\n", err)
}
if exists {
log.Fatalf("FATAL WAL file '%s' already archived, not overwriting", dirArc)
func UploadWALFile(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verify bool, bkgUpload bool) {
if pre.PreventWalOverwrite {
if checkWALOverwrite(pre, tarUploader, dirArc) {
if !bkgUpload {
log.Panicf("FATAL: WAL file '%s' already archived, contents differ, unable to overwrite\n", dirArc)
}
log.Printf("WARNING: WAL file '%s' already archived, contents differ, unable to overwrite\n", dirArc)
return
}
}

path, err := tarUploader.UploadWal(dirArc, pre, verify)
Expand All @@ -617,6 +616,44 @@ func UploadWALFile(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verif
}
if err != nil {
log.Printf("upload: could not upload '%s'\n", path)
log.Fatalf("FATAL %v\n", err)
log.Fatalf("FATAL: %v\n", err)
}
}

func checkWALOverwrite(pre *S3Prefix, tarUploader *TarUploader, dirArc string) (overwriteAttempt bool) {
archiveReader, exists, err := tryDownloadWALFile(pre,
sanitizePath(tarUploader.server+WalPath+filepath.Base(dirArc)+"."+tarUploader.compressor.FileExtension()))
if err != nil {
log.Fatalf("%+v\n", err)
}
if exists {
archived := &bytes.Buffer{}

err = decompressWALFile(archiveReader, &bufCloser{archived}, getDecompressorByCompressor(tarUploader.compressor))

local, err := os.Open(dirArc)
defer local.Close()

localBytes, err := ioutil.ReadAll(local)

if err != nil {
log.Fatalf("FATAL: %+v\n", err)
}

if !bytes.Equal(archived.Bytes(), localBytes) {
return true
} else {
log.Printf("WARNING: WAL file '%s' already archived, archived content equals\n", dirArc)
return false
}
}
return false
}

type bufCloser struct {
*bytes.Buffer
}

func (w *bufCloser) Close() error {
return nil
}
10 changes: 10 additions & 0 deletions compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ var Decompressors = []Decompressor{
ZstdDecompressor{},
LzmaDecompressor{},
}

func getDecompressorByCompressor(compressor Compressor) Decompressor {
extension := compressor.FileExtension()
for _,d:=range Decompressors{
if d.FileExtension() == extension {
return d
}
}
return nil
}
7 changes: 4 additions & 3 deletions s3_prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import "github.com/aws/aws-sdk-go/service/s3/s3iface"

// S3Prefix contains the S3 service client, bucket and string.
type S3Prefix struct {
Svc s3iface.S3API
Bucket *string
Server *string
Svc s3iface.S3API
Bucket *string
Server *string
PreventWalOverwrite bool
}
11 changes: 10 additions & 1 deletion upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Configure() (*TarUploader, *S3Prefix, error) {
if len(s3ForcePathStyleStr) > 0 {
s3ForcePathStyle, err := strconv.ParseBool(s3ForcePathStyleStr)
if err != nil {
return nil, nil, errors.Wrap(err, "Configure: failed parse AWS_S3_FORCE_PATH_STYLE")
return nil, nil, errors.Wrap(err, "Configure: failed to parse AWS_S3_FORCE_PATH_STYLE")
}
config.S3ForcePathStyle = aws.Bool(s3ForcePathStyle)
}
Expand All @@ -120,6 +120,15 @@ func Configure() (*TarUploader, *S3Prefix, error) {
Server: aws.String(server),
}

preventWalOverwriteStr := os.Getenv("WALG_PREVENT_WAL_OVERWRITE")
if len(preventWalOverwriteStr) > 0 {
preventWalOverwrite, err := strconv.ParseBool(preventWalOverwriteStr)
if err != nil {
return nil, nil, errors.Wrap(err, "Configure: failed to parse WALG_PREVENT_WAL_OVERWRITE")
}
pre.PreventWalOverwrite = preventWalOverwrite
}

diskLimitStr := os.Getenv("WALG_DISK_RATE_LIMIT")
if diskLimitStr != "" {
diskLimit, err := strconv.ParseInt(diskLimitStr, 10, 64)
Expand Down

0 comments on commit 28b6003

Please sign in to comment.