Skip to content

Commit

Permalink
Fix prefetch of files near the end of archive
Browse files Browse the repository at this point in the history
  • Loading branch information
x4m committed Sep 6, 2018
1 parent 96ebbb9 commit 07ed635
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
17 changes: 10 additions & 7 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,10 @@ func HandleWALFetch(pre *S3Prefix, walFileName string, location string, triggerP
time.Sleep(50 * time.Millisecond)
}

DownloadAndDecompressWALFile(pre, walFileName, location)
err := DownloadAndDecompressWALFile(pre, walFileName, location)
if err != nil {
log.Fatalf("%+v\n", err)
}
}

func checkWALFileMagic(prefetched string) error {
Expand Down Expand Up @@ -563,28 +566,28 @@ func decompressWALFile(archiveReader io.ReadCloser, file io.WriteCloser, decompr
}

// DownloadAndDecompressWALFile downloads a file and writes it to local file
func DownloadAndDecompressWALFile(pre *S3Prefix, walFileName string, dstLocation string) {
func DownloadAndDecompressWALFile(pre *S3Prefix, walFileName string, dstLocation string) error {
for _, decompressor := range Decompressors {
archiveReader, exists, err := tryDownloadWALFile(pre, *pre.Server+WalPath+walFileName+"."+decompressor.FileExtension())
if err != nil {
log.Fatalf("%+v\n", err)
return err
}
if !exists {
continue
}

file, err := os.OpenFile(dstLocation, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
if err != nil {
log.Fatalf("%+v\n", err)
return err
}

err = decompressWALFile(archiveReader, file, decompressor)
if err != nil {
log.Fatalf("%+v\n", err)
return err
}
return
return nil
}
log.Fatalf("Archive '%s' does not exist.\n", walFileName)
return errors.New("Archive '%s' does not exist " + walFileName)
}

// HandleWALPush is invoked to perform wal-g wal-push
Expand Down
5 changes: 4 additions & 1 deletion prefetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func prefetchFile(location string, pre *S3Prefix, walFileName string, wg *sync.W
log.Println("WAL-prefetch file: ", walFileName)
os.MkdirAll(runningLocation, 0755)

DownloadAndDecompressWALFile(pre, walFileName, oldPath)
err := DownloadAndDecompressWALFile(pre, walFileName, oldPath)
if err != nil {
return // something somewhere went wrong - prefetch will cleanup for itself
}

_, errO = os.Stat(oldPath)
_, errN = os.Stat(newPath)
Expand Down

0 comments on commit 07ed635

Please sign in to comment.