Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry config to BucketHandle #5170

Merged
merged 10 commits into from Dec 2, 2021
46 changes: 36 additions & 10 deletions storage/bucket.go
Expand Up @@ -44,6 +44,7 @@ type BucketHandle struct {
defaultObjectACL ACLHandle
conds *BucketConditions
userProject string // project for Requester Pays buckets
retry *retryConfig
}

// Bucket returns a BucketHandle, which provides operations on the named bucket.
Expand Down Expand Up @@ -95,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck
if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
}
return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err })
return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true)
}

// Delete deletes the Bucket.
Expand All @@ -107,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) {
if err != nil {
return err
}
return runWithRetry(ctx, func() error { return req.Context(ctx).Do() })

return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true)
}

func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) {
Expand Down Expand Up @@ -156,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle {
},
gen: -1,
userProject: b.userProject,
retry: b.retry.clone(),
}
}

Expand All @@ -169,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error
return nil, err
}
var resp *raw.Bucket
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
resp, err = req.Context(ctx).Do()
return err
})
}, b.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrBucketNotExist
Expand Down Expand Up @@ -210,12 +213,20 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) (
if uattrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
}
// TODO(jba): retry iff metagen is set?
rb, err := req.Context(ctx).Do()
if err != nil {

isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved

var rawBucket *raw.Bucket
call := func() error {
rb, err := req.Context(ctx).Do()
rawBucket = rb
return err
}

if err := run(ctx, call, b.retry, isIdempotent); err != nil {
return nil, err
}
return newBucket(rb)
return newBucket(rawBucket)
}

func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) {
Expand Down Expand Up @@ -1081,10 +1092,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error {
metageneration = b.conds.MetagenerationMatch
}
req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration)
return runWithRetry(ctx, func() error {
return run(ctx, func() error {
_, err := req.Context(ctx).Do()
return err
})
}, b.retry, true)
}

// applyBucketConds modifies the provided call using the conditions in conds.
Expand Down Expand Up @@ -1367,6 +1378,21 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator {
return it
}

// Retryer returns a bucket handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// Retry options set on a object handle will take precedence over options set on
// the bucket handle.
func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
b2 := *b
retry := &retryConfig{}
for _, opt := range opts {
opt.apply(retry)
}
b2.retry = retry
return &b2
}

// An ObjectIterator is an iterator over ObjectAttrs.
//
// Note: This iterator is not safe for concurrent operations without explicit synchronization.
Expand Down
88 changes: 88 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
)
Expand Down Expand Up @@ -717,3 +718,90 @@ func TestNewBucket(t *testing.T) {
t.Errorf("got=-, want=+:\n%s", diff)
}
}

func TestBucketRetryer(t *testing.T) {
testCases := []struct {
name string
call func(b *BucketHandle) *BucketHandle
want *retryConfig
}{
{
name: "all defaults",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer()
},
want: &retryConfig{},
},
{
name: "set all options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}),
WithPolicy(RetryAlways),
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
backoff: &gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
},
policy: RetryAlways,
shouldRetry: func(err error) bool { return false },
},
},
{
name: "set some backoff options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
Multiplier: 3,
}},
},
{
name: "set policy only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(WithPolicy(RetryNever))
},
want: &retryConfig{
policy: RetryNever,
},
},
{
name: "set ErrorFunc only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
shouldRetry: func(err error) bool { return false },
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
b := tc.call(&BucketHandle{})
if diff := cmp.Diff(
b.retry,
tc.want,
cmp.AllowUnexported(retryConfig{}, gax.Backoff{}),
// ErrorFunc cannot be compared directly, but we check if both are
// either nil or non-nil.
cmp.Comparer(func(a, b func(err error) bool) bool {
return (a == nil && b == nil) || (a != nil && b != nil)
}),
); diff != "" {
s.Fatalf("retry not configured correctly: %v", diff)
}
})
}
}
142 changes: 142 additions & 0 deletions storage/integration_test.go
Expand Up @@ -51,6 +51,7 @@ import (
"cloud.google.com/go/internal/uid"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -4448,6 +4449,147 @@ func findTestCredentials(ctx context.Context, envVar string, scopes ...string) (
return transport.Creds(ctx, opts...)
}

func TestIntegration_RetryConfig(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this test is a bit hacky since it depends on an unrealistic shouldRetry method. Do we really need an integration test? I feel like it's probably sufficient to just to write a unit test that verifies that the configs are what we expect them to be, if options are set on either or both of the BucketHandle and ObjectHandle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. switched it to unit test

ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

bucket := client.Bucket(bucketName)

var numTries int
// with numTries = 2, this function sets the config to retry once (if at all)
shouldRetry := func(err error) bool {
numTries--
return numTries > 0
}

testCases := []struct {
name string
bucketOptions []RetryOption
objectOptions []RetryOption
shouldHaveTriesLeft bool
shouldBeQuickerThan time.Duration // backoff options should have a large difference in values so that this condition can be checked
}{
{
name: "object retryer is used",
objectOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
},
{
name: "bucket retryer is used",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
},
{
name: "object retryer overrides bucket retryer",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithPolicy(RetryNever),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: true,
},
{
name: "object retryer overrides bucket retryer backoff options",
bucketOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Minute,
Max: time.Hour,
Multiplier: 6,
}),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Nanosecond,
Max: time.Microsecond,
}),
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
shouldBeQuickerThan: time.Second,
},
{
name: "object retryer does not override bucket retryer if option is not set",
bucketOptions: []RetryOption{
WithPolicy(RetryNever),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Nanosecond,
Max: time.Second,
}),
},
shouldHaveTriesLeft: true,
},
{
name: "object's backoff completely overwrites bucket's backoff",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
WithBackoff(gax.Backoff{
Initial: time.Hour,
}),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Multiplier: 2,
}),
},
shouldHaveTriesLeft: false,
shouldBeQuickerThan: time.Minute,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
numTries = 2

b := bucket
if len(tc.bucketOptions) > 0 {
b = b.Retryer(tc.bucketOptions...)
}

o := b.Object(uidSpace.New())
if len(tc.objectOptions) > 0 {
o = o.Retryer(tc.objectOptions...)
}

// set a timeout so that tests do not retry longer than expected
ctx, cancel := context.WithTimeout(ctx, tc.shouldBeQuickerThan+time.Second*5)
defer cancel()

start := time.Now()
_, err := o.Attrs(ctx) // object doesn't exist so this should always error
timeTaken := time.Since(start)
if err == nil {
t.Error("Object should not exist but call to attrs does not error")
}

if (numTries > 0 && !tc.shouldHaveTriesLeft) || (numTries <= 0 && tc.shouldHaveTriesLeft) {
t.Errorf("Unexpected number of retries; want tries left: %t, got tries left: %d", tc.shouldHaveTriesLeft, numTries)
}

if tc.shouldBeQuickerThan > 0 && timeTaken > tc.shouldBeQuickerThan {
t.Errorf("Retries took longer than expected; check backoff options.\n"+
"time taken (context may have been cancelled prior to full completion of retries): %s\nmax time expected to take: %s",
timeTaken, tc.shouldBeQuickerThan)
}
})
}
}

type testHelper struct {
t *testing.T
}
Expand Down