Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry config to BucketHandle #5170

Merged
merged 10 commits into from Dec 2, 2021
46 changes: 36 additions & 10 deletions storage/bucket.go
Expand Up @@ -44,6 +44,7 @@ type BucketHandle struct {
defaultObjectACL ACLHandle
conds *BucketConditions
userProject string // project for Requester Pays buckets
retry *retryConfig
}

// Bucket returns a BucketHandle, which provides operations on the named bucket.
Expand Down Expand Up @@ -95,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck
if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
}
return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err })
return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true)
}

// Delete deletes the Bucket.
Expand All @@ -107,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) {
if err != nil {
return err
}
return runWithRetry(ctx, func() error { return req.Context(ctx).Do() })

return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true)
}

func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) {
Expand Down Expand Up @@ -156,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle {
},
gen: -1,
userProject: b.userProject,
retry: b.retry.clone(),
}
}

Expand All @@ -169,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error
return nil, err
}
var resp *raw.Bucket
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
resp, err = req.Context(ctx).Do()
return err
})
}, b.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrBucketNotExist
Expand Down Expand Up @@ -210,12 +213,20 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) (
if uattrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
}
// TODO(jba): retry iff metagen is set?
rb, err := req.Context(ctx).Do()
if err != nil {

isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved

var rawBucket *raw.Bucket
call := func() error {
rb, err := req.Context(ctx).Do()
rawBucket = rb
return err
}

if err := run(ctx, call, b.retry, isIdempotent); err != nil {
return nil, err
}
return newBucket(rb)
return newBucket(rawBucket)
}

func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) {
Expand Down Expand Up @@ -1081,10 +1092,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error {
metageneration = b.conds.MetagenerationMatch
}
req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration)
return runWithRetry(ctx, func() error {
return run(ctx, func() error {
_, err := req.Context(ctx).Do()
return err
})
}, b.retry, true)
}

// applyBucketConds modifies the provided call using the conditions in conds.
Expand Down Expand Up @@ -1367,6 +1378,21 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator {
return it
}

// Retryer returns a bucket handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// Retry options set on a object handle will take precedence over options set on
// the bucket handle.
func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
b2 := *b
retry := &retryConfig{}
for _, opt := range opts {
opt.apply(retry)
}
b2.retry = retry
return &b2
}

// An ObjectIterator is an iterator over ObjectAttrs.
//
// Note: This iterator is not safe for concurrent operations without explicit synchronization.
Expand Down
88 changes: 88 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
)
Expand Down Expand Up @@ -717,3 +718,90 @@ func TestNewBucket(t *testing.T) {
t.Errorf("got=-, want=+:\n%s", diff)
}
}

func TestBucketRetryer(t *testing.T) {
testCases := []struct {
name string
call func(b *BucketHandle) *BucketHandle
want *retryConfig
}{
{
name: "all defaults",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer()
},
want: &retryConfig{},
},
{
name: "set all options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}),
WithPolicy(RetryAlways),
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
backoff: &gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
},
policy: RetryAlways,
shouldRetry: func(err error) bool { return false },
},
},
{
name: "set some backoff options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
Multiplier: 3,
}},
},
{
name: "set policy only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(WithPolicy(RetryNever))
},
want: &retryConfig{
policy: RetryNever,
},
},
{
name: "set ErrorFunc only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
shouldRetry: func(err error) bool { return false },
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
b := tc.call(&BucketHandle{})
if diff := cmp.Diff(
b.retry,
tc.want,
cmp.AllowUnexported(retryConfig{}, gax.Backoff{}),
// ErrorFunc cannot be compared directly, but we check if both are
// either nil or non-nil.
cmp.Comparer(func(a, b func(err error) bool) bool {
return (a == nil && b == nil) || (a != nil && b != nil)
}),
); diff != "" {
s.Fatalf("retry not configured correctly: %v", diff)
}
})
}
}
32 changes: 31 additions & 1 deletion storage/storage.go
Expand Up @@ -1790,9 +1790,18 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool
// Retryer returns an object handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// These retry options will merge with the bucket's retryer (if set) for the
// returned handle. Options passed into this method will take precedence over
// options on the bucket's retryer.
func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle {
o2 := *o
retry := &retryConfig{}
var retry *retryConfig
if o.retry != nil {
// merge the options with the existing retry
retry = o.retry
} else {
retry = &retryConfig{}
}
for _, opt := range opts {
opt.apply(retry)
}
Expand Down Expand Up @@ -1897,6 +1906,27 @@ type retryConfig struct {
shouldRetry func(err error) bool
}

func (r *retryConfig) clone() *retryConfig {
if r == nil {
return nil
}

var bo *gax.Backoff
if r.backoff != nil {
bo = &gax.Backoff{
Initial: r.backoff.Initial,
Max: r.backoff.Max,
Multiplier: r.backoff.Multiplier,
}
}

return &retryConfig{
backoff: bo,
policy: r.policy,
shouldRetry: r.shouldRetry,
}
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
// that modifyCall searches for by name.
type composeSourceObj struct {
Expand Down