Skip to content

Commit

Permalink
Pass Context around Folder upload functions, add PG WAL push timeout (#…
Browse files Browse the repository at this point in the history
…1546)

* Pass Context around Folder upload functions, add PG WAL push timeout

* Pass Context around Folder upload functions, add PG WAL push timeout

* Pass Context around Folder upload functions, add PG WAL push timeout

* Add PG WAL push timeout configuration

* Run Swift Folder test with docker. Fix Swift CopyObject method.

* Bump Swift library major version to github.com/ncw/swift/v2 v2.0.2

* PG daemon WAL push timeout - doc and small review fixes

* Add SSH Storage integration docker test

* Support GCS Folder internal timeout in PutObjectWithContext

* Stabilize Swift docker integration test

---------

Co-authored-by: Pavel Tolstikov <ne2pit@yandex-team.ru>
  • Loading branch information
ne2pit and Pavel Tolstikov committed Sep 11, 2023
1 parent ae85554 commit 5805948
Show file tree
Hide file tree
Showing 65 changed files with 772 additions and 259 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/dockertests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ jobs:
'make TEST="pg_delta_backup_fullscan_test" pg_integration_test',
'make TEST="pg_several_delta_backups_test" pg_integration_test',
'make TEST="pg_several_delta_backups_reverse_test" pg_integration_test',
'make TEST="pg_storage_swift_test" pg_integration_test',
'make TEST="pg_storage_ssh_test" pg_integration_test',
'make TEST="pg_delete_retain_find_full_test" pg_integration_test',
'make TEST="pg_wale_compatibility_test" pg_integration_test',
'make TEST="pg_delete_before_permanent_full_test" pg_integration_test',
Expand Down
4 changes: 2 additions & 2 deletions cmd/common/st/put_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package st
import (
"io"
"os"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
Expand Down Expand Up @@ -59,7 +59,7 @@ var putObjectCmd = &cobra.Command{
if err != nil {
return err
}
return storagetools.HandlePutObject(reader, dstPath, uploader, overwrite, !noEncrypt, !noCompress)
return storagetools.HandlePutObject(cmd.Context(), reader, dstPath, uploader, overwrite, !noEncrypt, !noCompress)
})
tracelog.ErrorLogger.FatalOnError(err)
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/gp/backup_push_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var (

backupHandler, err := greenplum.NewSegBackupHandler(arguments)
tracelog.ErrorLogger.FatalOnError(err)
backupHandler.HandleBackupPush()
backupHandler.HandleBackupPush(cmd.Context())
},
}
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var (

backupHandler, err := postgres.NewBackupHandler(arguments)
tracelog.ErrorLogger.FatalOnError(err)
backupHandler.HandleBackupPush()
backupHandler.HandleBackupPush(cmd.Context())
},
}
permanent = false
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg/catchup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
Run: func(cmd *cobra.Command, args []string) {
internal.ConfigureLimiters()

postgres.HandleCatchupPush(args[0], postgres.LSN(fromLSN))
postgres.HandleCatchupPush(cmd.Context(), args[0], postgres.LSN(fromLSN))
},
}
fromLSN uint64
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg/wal_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var walPushCmd = &cobra.Command{
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
walUploader := GetWalUploader()
err := postgres.HandleWALPush(walUploader, args[0])
err := postgres.HandleWALPush(cmd.Context(), walUploader, args[0])
tracelog.ErrorLogger.FatalOnError(err)
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/pg/wal_receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var walReceiveCmd = &cobra.Command{
tracelog.ErrorLogger.PrintError(err)
uploader.ArchiveStatusManager = asm.NewNopASM()
}
postgres.HandleWALReceive(uploader)
postgres.HandleWALReceive(cmd.Context(), uploader)
},
}

Expand Down
32 changes: 32 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ services:
image: wal-g/ubuntu
container_name: wal-g_ubuntu

swift:
image: openstackswift/saio:change_890691_py3 # "version": "2.32.0.dev213"
container_name: wal-g_swift
hostname: swift
ports:
- "8010:8080"

s3:
image: minio/minio:RELEASE.2021-06-07T21-40-51Z
container_name: wal-g_s3
Expand Down Expand Up @@ -175,6 +182,31 @@ services:
user: postgres
<<: *s3_common_depends

pg_storage_swift_test:
image: wal-g/docker_prefix
container_name: wal-g_pg_storage_swift_test
command: /tmp/test_bins/swift.test -test.v
depends_on:
- swift
links:
- swift
environment:
- "PG_TEST_STORAGE=swift"
- "OS_USERNAME=test:tester"
- "OS_PASSWORD=testing"
- "OS_AUTH_URL=http://swift:8080/auth/v1.0"

pg_storage_ssh_test:
image: wal-g/docker_prefix
container_name: wal-g_pg_storage_ssh_test
command: /tmp/test_bins/sh.test -test.v
depends_on:
- ssh
links:
- ssh
environment:
- "PG_TEST_STORAGE=ssh"

pg_ready_rename_test:
<<: *pg_test_common
container_name: archiving_ready_rename_test
Expand Down
5 changes: 5 additions & 0 deletions docker/pg_tests/Dockerfile_prefix
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ RUN sed -i 's|#cgo LDFLAGS: -lbrotli.*|&-static -lbrotlicommon-static -lm|' \
cd ../../cmd/daemonclient && \
go build -mod vendor -o walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S`"

RUN --mount=type=cache,target=/gocache cd pkg/storages/swift && GOCACHE=/gocache go test -v -c
RUN --mount=type=cache,target=/gocache cd pkg/storages/sh && GOCACHE=/gocache go test -v -c

FROM wal-g/pg:latest

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y netcat-openbsd && apt-get clean

COPY --from=build /go/src/github.com/wal-g/wal-g/pkg/storages/swift/swift.test /tmp/test_bins/
COPY --from=build /go/src/github.com/wal-g/wal-g/pkg/storages/sh/sh.test /tmp/test_bins/
COPY --from=build /go/src/github.com/wal-g/wal-g/main/pg/wal-g /usr/bin
COPY --from=build /go/src/github.com/wal-g/wal-g/cmd/daemonclient/walg-daemon-client /usr/bin

Expand Down
6 changes: 6 additions & 0 deletions docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,12 @@ Usage:
wal-g daemon path/to/socket-descriptor
```

Configuration:

* `WALG_DAEMON_WAL_UPLOAD_TIMEOUT`

To configure time limit for every WAL archive in daemon. Hanging for a longer time operations will be interrupted. Default value is 60s.

pgBackRest backups support (beta version)
-----------
### ``pgbackrest backup-list``
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ require (
github.com/jackc/pgproto3/v2 v2.0.7
github.com/jackc/pgx v3.6.0+incompatible
github.com/jedib0t/go-pretty v4.3.0+incompatible
github.com/magiconair/properties v1.8.1
github.com/magiconair/properties v1.8.1 // indirect
github.com/minio/sio v0.2.0
github.com/mongodb/mongo-tools-common v2.0.1+incompatible
github.com/ncw/swift v1.0.49
github.com/pierrec/lz4/v4 v4.1.11
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.11.0
Expand All @@ -56,13 +55,16 @@ require (
)

require (
github.com/ProtonMail/go-crypto v0.0.0-20230426101702-58e86b294756
github.com/cactus/go-statsd-client/v5 v5.0.0
github.com/google/brotli/go/cbrotli v0.0.0-20220110100810-f4153a09f87c
github.com/klauspost/compress v1.15.12
github.com/ncw/swift/v2 v2.0.2
github.com/pkg/profile v1.6.0
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/client_model v0.2.0
golang.org/x/mod v0.8.0
golang.org/x/sys v0.6.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -76,7 +78,6 @@ require (
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230426101702-58e86b294756 // indirect
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand Down Expand Up @@ -149,7 +150,6 @@ require (
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae h1:VeRdUYdCw49yizlSbM
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/ncw/swift v1.0.49 h1:eQaKIjSt/PXLKfYgzg01nevmO+CMXfXGRhB1gOhDs7E=
github.com/ncw/swift v1.0.49/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/ncw/swift/v2 v2.0.2 h1:jx282pcAKFhmoZBSdMcCRFn9VWkoBIRsCpe+yZq7vEk=
github.com/ncw/swift/v2 v2.0.2/go.mod h1:z0A9RVdYPjNjXVo2pDOPxZ4eu3oarO1P91fTItcb+Kg=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=
Expand Down
9 changes: 6 additions & 3 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
PgFailoverStorages = "WALG_FAILOVER_STORAGES"
PgFailoverStoragesCheckTimeout = "WALG_FAILOVER_STORAGES_CHECK_TIMEOUT"
PgFailoverStorageCacheLifetime = "WALG_FAILOVER_STORAGES_CACHE_LIFETIME"
PgDaemonWALUploadTimeout = "WALG_DAEMON_WAL_UPLOAD_TIMEOUT"

ProfileSamplingRatio = "PROFILE_SAMPLING_RATIO"
ProfileMode = "PROFILE_MODE"
Expand Down Expand Up @@ -250,9 +251,10 @@ var (
}

PGDefaultSettings = map[string]string{
PgWalSize: "16",
PgBackRestStanza: "main",
PgAliveCheckInterval: "1m",
PgWalSize: "16",
PgBackRestStanza: "main",
PgAliveCheckInterval: "1m",
PgDaemonWALUploadTimeout: "60s",
}

GPDefaultSettings = map[string]string{
Expand Down Expand Up @@ -421,6 +423,7 @@ var (
PgFailoverStorages: true,
PgFailoverStoragesCheckTimeout: true,
PgFailoverStorageCacheLifetime: true,
PgDaemonWALUploadTimeout: true,
}

MongoAllowedSettings = map[string]bool{
Expand Down
121 changes: 121 additions & 0 deletions internal/contextio/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Source: https://github.com/dolmen-go/contextio

/*
Copyright 2018 Olivier Mengué
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package contextio provides [io.Writer] and [io.Reader] that stop accepting/providing
// data when an attached context is canceled.
package contextio

import (
"context"
"io"
)

type writer struct {
ctx context.Context
w io.Writer
}

type copier struct {
writer
}

// NewWriter wraps an [io.Writer] to handle context cancellation.
//
// Context state is checked BEFORE every Write.
//
// The returned Writer also implements [io.ReaderFrom] to allow [io.Copy] to select
// the best strategy while still checking the context state before every chunk transfer.
func NewWriter(ctx context.Context, w io.Writer) io.Writer {
if w, ok := w.(*copier); ok && ctx == w.ctx {
return w
}
return &copier{writer{ctx: ctx, w: w}}
}

// Write implements [io.Writer], but with context awareness.
func (w *writer) Write(p []byte) (n int, err error) {
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
default:
return w.w.Write(p)
}
}

type reader struct {
ctx context.Context
r io.Reader
}

// NewReader wraps an [io.Reader] to handle context cancellation.
//
// Context state is checked BEFORE every Read.
func NewReader(ctx context.Context, r io.Reader) io.Reader {
if r, ok := r.(*reader); ok && ctx == r.ctx {
return r
}
return &reader{ctx: ctx, r: r}
}

func (r *reader) Read(p []byte) (n int, err error) {
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
default:
return r.r.Read(p)
}
}

// ReadFrom implements interface [io.ReaderFrom], but with context awareness.
//
// This should allow efficient copying allowing writer or reader to define the chunk size.
func (w *copier) ReadFrom(r io.Reader) (n int64, err error) {
if _, ok := w.w.(io.ReaderFrom); ok {
// Let the original Writer decide the chunk size.
return io.Copy(w.writer.w, &reader{ctx: w.ctx, r: r})
}
select {
case <-w.ctx.Done():
return 0, w.ctx.Err()
default:
// The original Writer is not a ReaderFrom.
// Let the Reader decide the chunk size.
return io.Copy(&w.writer, r)
}
}

// NewCloser wraps an [io.Reader] to handle context cancellation.
//
// Context state is checked BEFORE any Close.
func NewCloser(ctx context.Context, c io.Closer) io.Closer {
return &closer{ctx: ctx, c: c}
}

type closer struct {
ctx context.Context
c io.Closer
}

func (c *closer) Close() error {
select {
case <-c.ctx.Done():
return c.ctx.Err()
default:
return c.c.Close()
}
}

0 comments on commit 5805948

Please sign in to comment.