Skip to content

Commit

Permalink
Add pgbackrest wal-fetch and wal-show commands (#1202)
Browse files Browse the repository at this point in the history
* Write pgbackrest wal-fetch and wal-show commands

* Test pgbakcrest wal-fetch functionality

* Fix lint

* Download file without extension as last resort

* Fix pgbench command

* Fix build

* Fix pgbackrest test

* Update PostgreSQL.md

* Update PostgreSQL.md

* Add minor adjustments to DownloadAndDecompressStorageFile

Co-authored-by: usernamedt <usernamedt@yandex-team.com>
  • Loading branch information
Dm17r1y and usernamedt committed May 20, 2022
1 parent dfa01e4 commit 1eb88a5
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 44 deletions.
22 changes: 22 additions & 0 deletions cmd/pg/pgbackrest_wal_fetch.go
@@ -0,0 +1,22 @@
package pg

import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/databases/postgres/pgbackrest"
)

var pgbackrestWalFetchCmd = &cobra.Command{
Use: "wal-fetch wal_name destination_filename",
Short: WalFetchShortDescription,
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
folder, stanza := configurePgbackrestSettings()
err := pgbackrest.HandleWalFetch(folder, stanza, args[0], args[1])
tracelog.ErrorLogger.FatalOnError(err)
},
}

func init() {
pgbackrestCmd.AddCommand(pgbackrestWalFetchCmd)
}
32 changes: 32 additions & 0 deletions cmd/pg/pgbackrest_wal_show.go
@@ -0,0 +1,32 @@
package pg

import (
"os"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/internal/databases/postgres/pgbackrest"
)

var pgbackrestWalgShowCmd = &cobra.Command{
Use: "wal-show",
Short: WalShowUsage,
Long: WalShowLongDescription,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
folder, stanza := configurePgbackrestSettings()
outputType := postgres.TableOutput
if detailedJSONOutput {
outputType = postgres.JSONOutput
}
outputWriter := postgres.NewWalShowOutputWriter(outputType, os.Stdout, false)
err := pgbackrest.HandleWalShow(folder, stanza, outputWriter)
tracelog.ErrorLogger.FatalOnError(err)
},
}

func init() {
pgbackrestCmd.AddCommand(pgbackrestWalgShowCmd)
pgbackrestWalgShowCmd.Flags().BoolVar(&detailedJSONOutput, detailedOutputFlag, false, detailedOutputDescription)
}
17 changes: 17 additions & 0 deletions docker/pg_tests/scripts/tests/pgbackrest_backup_fetch_test.sh
Expand Up @@ -22,14 +22,21 @@ echo "archive_timeout = 600" >> ${PGDATA}/postgresql.conf
mkdir -m 770 /tmp/pgbackrest-backups

pgbackrest --stanza=main --pg1-path=${PGDATA} --repo1-path=/tmp/pgbackrest-backups stanza-create

pgbench -i -s 5 postgres
pgbench -c 2 -T 1000 &
pgbench_pid=$!

sleep 1
pgbackrest --stanza=main --pg1-path=${PGDATA} --repo1-path=/tmp/pgbackrest-backups backup
wait $pgbench_pid
pg_dumpall -f /tmp/dump1

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w stop

s3cmd mb s3://pgbackrest-backups || echo "Bucket pgbackrest-backups already exists"
s3cmd sync /tmp/pgbackrest-backups/backup s3://pgbackrest-backups
s3cmd sync /tmp/pgbackrest-backups/archive s3://pgbackrest-backups

/tmp/scripts/drop_pg.sh
pgbackrest --stanza=main --pg1-path=${PGDATA} --repo1-path=/tmp/pgbackrest-backups restore
Expand All @@ -41,6 +48,16 @@ wal-g --config=${TMP_CONFIG} pgbackrest backup-fetch ${PGDATA} LATEST
tar --mtime='UTC 2019-01-01' --sort=name -cf /tmp/pg_data_actual.tar ${PGDATA}

diff /tmp/pg_data_expected.tar /tmp/pg_data_actual.tar
echo "Pgbackrest and wal-g backups are the same!"

echo "restore_command = 'wal-g --config=${TMP_CONFIG} pgbackrest wal-fetch \"%f\" \"%p\"'" > ${PGDATA}/recovery.conf
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA} -w start
/tmp/scripts/wait_while_pg_not_ready.sh
pg_dumpall -f /tmp/dump2

diff /tmp/dump1 /tmp/dump2

psql -f /tmp/scripts/amcheck.sql -v "ON_ERROR_STOP=1" postgres
echo "Backup success!!!!!!"

/tmp/scripts/drop_pg.sh
Expand Down
18 changes: 18 additions & 0 deletions docs/PostgreSQL.md
Expand Up @@ -565,3 +565,21 @@ Usage:
```bash
wal-g pgbackrest backup-fetch path/to/destination-directory backup-name
```

### ``pgbackrest wal-fetch``

Fetch wal file from pgbackrest backup

Usage:
```bash
wal-g pgbackrest wal-fetch example-archive new-file-name
```

### ``pgbackrest wal-show``

Show wal files from pgbackrest backup

Usage:
```bash
wal-g pgbackrest wal-show
```
10 changes: 5 additions & 5 deletions internal/databases/postgres/integrity_check_runner.go
Expand Up @@ -151,11 +151,11 @@ type IntegrityScanSegmentSequence struct {
func newIntegrityScanSegmentSequence(sequence *WalSegmentsSequence,
status ScannedSegmentStatus) *IntegrityScanSegmentSequence {
return &IntegrityScanSegmentSequence{
TimelineID: sequence.timelineID,
StartSegment: sequence.minSegmentNo.getFilename(sequence.timelineID),
EndSegment: sequence.maxSegmentNo.getFilename(sequence.timelineID),
TimelineID: sequence.TimelineID,
StartSegment: sequence.MinSegmentNo.getFilename(sequence.TimelineID),
EndSegment: sequence.MaxSegmentNo.getFilename(sequence.TimelineID),
Status: status,
SegmentsCount: len(sequence.walSegmentNumbers),
SegmentsCount: len(sequence.WalSegmentNumbers),
}
}

Expand Down Expand Up @@ -215,7 +215,7 @@ func collapseSegmentsByStatusAndTimeline(scannedSegments []ScannedSegmentDescrip
segment := scannedSegments[i]

// switch to the new sequence on segment Status change or timeline id change
if segment.status != currentStatus || currentSequence.timelineID != segment.Timeline {
if segment.status != currentStatus || currentSequence.TimelineID != segment.Timeline {
segmentSequences = append(segmentSequences, newIntegrityScanSegmentSequence(currentSequence, currentStatus))
currentSequence = NewSegmentsSequence(segment.Timeline, segment.Number)
currentStatus = segment.status
Expand Down
34 changes: 34 additions & 0 deletions internal/databases/postgres/pgbackrest/pgbackrest_settings.go
Expand Up @@ -2,6 +2,7 @@ package pgbackrest

import (
"encoding/json"
"fmt"

"github.com/wal-g/wal-g/pkg/storages/storage"
"gopkg.in/ini.v1"
Expand All @@ -11,11 +12,18 @@ const (
BackupPath = "backup"
BackupInfoIni = "backup.info"
BackupManifestIni = "backup.manifest"
WalArchivePath = "archive"
ArchiveInfo = "archive.info"

BackupFolderName = "backup"
BackupDataDirectory = "pg_data"
)

type ArchiveSettings struct {
DatabaseID int64 `ini:"db-id"`
DatabaseVersion string `ini:"db-version"`
}

type BackupSettings struct {
Name string
BackrestFormat int `json:"backrest-format"`
Expand Down Expand Up @@ -106,6 +114,32 @@ type DefaultPathSection struct {
User string `ini:"user"`
}

func GetArchiveName(folder storage.Folder, stanza string) (*string, error) {
archiveFolder := folder.GetSubFolder(WalArchivePath).GetSubFolder(stanza)
ioReader, err := archiveFolder.ReadObject(ArchiveInfo)
if err != nil {
return nil, err
}

cfg, err := ini.Load(ioReader)
if err != nil {
return nil, err
}

dbSection, err := cfg.GetSection("db")
if err != nil {
return nil, err
}

var settings ArchiveSettings
if err := dbSection.MapTo(&settings); err != nil {
return nil, err
}

archiveName := fmt.Sprintf("%s-%d", settings.DatabaseVersion, settings.DatabaseID)
return &archiveName, nil
}

func LoadBackupsSettings(folder storage.Folder, stanza string) ([]BackupSettings, error) {
backupFolder := folder.GetSubFolder(BackupPath).GetSubFolder(stanza)
ioReader, err := backupFolder.ReadObject(BackupInfoIni)
Expand Down
41 changes: 41 additions & 0 deletions internal/databases/postgres/pgbackrest/wal_fetch_handler.go
@@ -0,0 +1,41 @@
package pgbackrest

import (
"errors"
"path/filepath"
"strings"

"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

func HandleWalFetch(folder storage.Folder, stanza string, walFileName string, location string) error {
archiveName, err := GetArchiveName(folder, stanza)
if err != nil {
return err
}

archiveFolder := folder.GetSubFolder(WalArchivePath).GetSubFolder(stanza).GetSubFolder(*archiveName)
if strings.HasSuffix(walFileName, ".history") {
return internal.DownloadFileTo(archiveFolder, walFileName, location)
}

subdirectoryName := walFileName[0:16]
walFolder := archiveFolder.GetSubFolder(subdirectoryName)
if strings.HasSuffix(walFileName, ".backup") {
return internal.DownloadFileTo(walFolder, walFileName, location)
}
fileList, _, err := walFolder.ListFolder()
if err != nil {
return err
}

for _, file := range fileList {
fileName := file.GetName()
if strings.HasPrefix(fileName, walFileName) {
return internal.DownloadFileTo(walFolder, strings.TrimSuffix(fileName, filepath.Ext(fileName)), location)
}
}

return errors.New("File " + walFileName + " not found in storage")
}
101 changes: 101 additions & 0 deletions internal/databases/postgres/pgbackrest/wal_show_handler.go
@@ -0,0 +1,101 @@
package pgbackrest

import (
"path"
"sort"
"strings"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

func HandleWalShow(rootFolder storage.Folder, stanza string, outputWriter postgres.WalShowOutputWriter) error {
archiveName, err := GetArchiveName(rootFolder, stanza)
if err != nil {
return err
}

archiveFolder := rootFolder.GetSubFolder(WalArchivePath).GetSubFolder(stanza).GetSubFolder(*archiveName)
walFiles, err := getWalFiles(archiveFolder)
if err != nil {
return err
}

walSegments, err := getWalSegments(walFiles)
if err != nil {
return err
}
walSequencesByTimelines := getWalSequencesByTimelines(walSegments)

var timelineInfos []*postgres.TimelineInfo
for _, segmentsSequence := range walSequencesByTimelines {
historyRecords, err := postgres.GetTimeLineHistoryRecords(segmentsSequence.TimelineID, archiveFolder)
if err != nil {
if _, ok := err.(postgres.HistoryFileNotFoundError); !ok {
tracelog.ErrorLogger.Fatalf("Error while loading .history file %v\n", err)
}
}

info, err := postgres.NewTimelineInfo(segmentsSequence, historyRecords)
tracelog.ErrorLogger.FatalfOnError("Error while creating TimeLineInfo %v\n", err)
timelineInfos = append(timelineInfos, info)
}

sort.Slice(timelineInfos, func(i, j int) bool {
return timelineInfos[i].ID < timelineInfos[j].ID
})

return outputWriter.Write(timelineInfos)
}

func getWalSequencesByTimelines(segments []postgres.WalSegmentDescription) map[uint32]*postgres.WalSegmentsSequence {
segmentsByTimelines := make(map[uint32]*postgres.WalSegmentsSequence)
for _, segment := range segments {
if timelineInfo, ok := segmentsByTimelines[segment.Timeline]; ok {
timelineInfo.AddWalSegmentNo(segment.Number)
continue
}
segmentsByTimelines[segment.Timeline] = postgres.NewSegmentsSequence(segment.Timeline, segment.Number)
}
return segmentsByTimelines
}

func getWalSegments(filenames []string) ([]postgres.WalSegmentDescription, error) {
var segments []postgres.WalSegmentDescription
for _, filename := range filenames {
extension := path.Ext(filename)
if extension == ".backup" || extension == ".history" {
continue
}

segmentName := strings.Split(path.Base(filename), "-")[0]
segment, err := postgres.NewWalSegmentDescription(segmentName)
if err != nil {
return nil, err
}

segments = append(segments, segment)
}
return segments, nil
}

func getWalFiles(archiveFolder storage.Folder) ([]string, error) {
var walFiles []string
_, walDirectories, err := archiveFolder.ListFolder()
if err != nil {
return nil, err
}

for _, walDirectory := range walDirectories {
files, _, err := walDirectory.ListFolder()
if err != nil {
return nil, err
}

for _, file := range files {
walFiles = append(walFiles, path.Join(walDirectory.GetPath(), file.GetName()))
}
}
return walFiles, nil
}
4 changes: 2 additions & 2 deletions internal/databases/postgres/timeline_history_record.go
Expand Up @@ -68,7 +68,7 @@ func newHistoryRecordFromString(row string) (*TimelineHistoryRecord, error) {
func createTimelineSwitchMap(startTimeline uint32,
walFolder storage.Folder) (map[WalSegmentNo]*TimelineHistoryRecord, error) {
timeLineHistoryMap := make(map[WalSegmentNo]*TimelineHistoryRecord)
historyRecords, err := getTimeLineHistoryRecords(startTimeline, walFolder)
historyRecords, err := GetTimeLineHistoryRecords(startTimeline, walFolder)
if _, ok := err.(HistoryFileNotFoundError); ok {
// return empty map if not found any history
return timeLineHistoryMap, nil
Expand All @@ -84,7 +84,7 @@ func createTimelineSwitchMap(startTimeline uint32,
return timeLineHistoryMap, nil
}

func getTimeLineHistoryRecords(startTimeline uint32, walFolder storage.Folder) ([]*TimelineHistoryRecord, error) {
func GetTimeLineHistoryRecords(startTimeline uint32, walFolder storage.Folder) ([]*TimelineHistoryRecord, error) {
historyReadCloser, err := getHistoryFileFromStorage(startTimeline, walFolder)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/databases/postgres/wal_restore_handler.go
Expand Up @@ -128,7 +128,7 @@ func GetMissingWals(lastSeg uint64, lastTl, currentTl uint32,
walsByTimelines map[uint32]*WalSegmentsSequence,
) ([]string, error) {
result := make([]string, 0)
currentSeg := uint64(walsByTimelines[currentTl].maxSegmentNo)
currentSeg := uint64(walsByTimelines[currentTl].MaxSegmentNo)

for ; currentTl >= lastTl; currentTl-- {
// Get wal segment sequence for current timeline
Expand All @@ -137,7 +137,7 @@ func GetMissingWals(lastSeg uint64, lastTl, currentTl uint32,
// Iterate over wal segment sequence for current timeline
for ; currentSeg >= tlToSeg[currentTl].segmentNo; currentSeg-- {
// Making sure that this wal segment sequence is correct and check for existing segment
if !ok || !walSegSeq.walSegmentNumbers[WalSegmentNo(currentSeg)] {
if !ok || !walSegSeq.WalSegmentNumbers[WalSegmentNo(currentSeg)] {
result = append(result, WalSegmentNo(currentSeg).getFilename(currentTl))
}

Expand Down

0 comments on commit 1eb88a5

Please sign in to comment.