Skip to content

Commit

Permalink
feat(storage): add retry config to BucketHandle (#5170)
Browse files Browse the repository at this point in the history
* Adds Retryer configurability to BucketHandle
* Adds retrying to BucketHandle.Update, including defaulting to retry only when idempotency conditions are present
* Adds retry config to all direct methods on BucketHandle
* Adds integration test for retry configs
Bucket config will merge with object config, with the object's config overriding the options it sets.
  • Loading branch information
BrennaEpp committed Dec 2, 2021
1 parent 359f5b1 commit b2b5476
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 12 deletions.
46 changes: 36 additions & 10 deletions storage/bucket.go
Expand Up @@ -44,6 +44,7 @@ type BucketHandle struct {
defaultObjectACL ACLHandle
conds *BucketConditions
userProject string // project for Requester Pays buckets
retry *retryConfig
}

// Bucket returns a BucketHandle, which provides operations on the named bucket.
Expand Down Expand Up @@ -95,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck
if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
}
return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err })
return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true)
}

// Delete deletes the Bucket.
Expand All @@ -107,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) {
if err != nil {
return err
}
return runWithRetry(ctx, func() error { return req.Context(ctx).Do() })

return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true)
}

func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) {
Expand Down Expand Up @@ -156,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle {
},
gen: -1,
userProject: b.userProject,
retry: b.retry.clone(),
}
}

Expand All @@ -169,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error
return nil, err
}
var resp *raw.Bucket
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
resp, err = req.Context(ctx).Do()
return err
})
}, b.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrBucketNotExist
Expand Down Expand Up @@ -210,12 +213,20 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) (
if uattrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL)
}
// TODO(jba): retry iff metagen is set?
rb, err := req.Context(ctx).Do()
if err != nil {

isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0

var rawBucket *raw.Bucket
call := func() error {
rb, err := req.Context(ctx).Do()
rawBucket = rb
return err
}

if err := run(ctx, call, b.retry, isIdempotent); err != nil {
return nil, err
}
return newBucket(rb)
return newBucket(rawBucket)
}

func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) {
Expand Down Expand Up @@ -1127,10 +1138,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error {
metageneration = b.conds.MetagenerationMatch
}
req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration)
return runWithRetry(ctx, func() error {
return run(ctx, func() error {
_, err := req.Context(ctx).Do()
return err
})
}, b.retry, true)
}

// applyBucketConds modifies the provided call using the conditions in conds.
Expand Down Expand Up @@ -1413,6 +1424,21 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator {
return it
}

// Retryer returns a bucket handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// Retry options set on a object handle will take precedence over options set on
// the bucket handle.
func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle {
b2 := *b
retry := &retryConfig{}
for _, opt := range opts {
opt.apply(retry)
}
b2.retry = retry
return &b2
}

// An ObjectIterator is an iterator over ObjectAttrs.
//
// Note: This iterator is not safe for concurrent operations without explicit synchronization.
Expand Down
88 changes: 88 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -23,6 +23,7 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
raw "google.golang.org/api/storage/v1"
)
Expand Down Expand Up @@ -717,3 +718,90 @@ func TestNewBucket(t *testing.T) {
t.Errorf("got=-, want=+:\n%s", diff)
}
}

func TestBucketRetryer(t *testing.T) {
testCases := []struct {
name string
call func(b *BucketHandle) *BucketHandle
want *retryConfig
}{
{
name: "all defaults",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer()
},
want: &retryConfig{},
},
{
name: "set all options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}),
WithPolicy(RetryAlways),
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
backoff: &gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
},
policy: RetryAlways,
shouldRetry: func(err error) bool { return false },
},
},
{
name: "set some backoff options",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
Multiplier: 3,
}},
},
{
name: "set policy only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(WithPolicy(RetryNever))
},
want: &retryConfig{
policy: RetryNever,
},
},
{
name: "set ErrorFunc only",
call: func(b *BucketHandle) *BucketHandle {
return b.Retryer(
WithErrorFunc(func(err error) bool { return false }))
},
want: &retryConfig{
shouldRetry: func(err error) bool { return false },
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
b := tc.call(&BucketHandle{})
if diff := cmp.Diff(
b.retry,
tc.want,
cmp.AllowUnexported(retryConfig{}, gax.Backoff{}),
// ErrorFunc cannot be compared directly, but we check if both are
// either nil or non-nil.
cmp.Comparer(func(a, b func(err error) bool) bool {
return (a == nil && b == nil) || (a != nil && b != nil)
}),
); diff != "" {
s.Fatalf("retry not configured correctly: %v", diff)
}
})
}
}
32 changes: 31 additions & 1 deletion storage/storage.go
Expand Up @@ -1790,9 +1790,18 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool
// Retryer returns an object handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// These retry options will merge with the bucket's retryer (if set) for the
// returned handle. Options passed into this method will take precedence over
// options on the bucket's retryer.
func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle {
o2 := *o
retry := &retryConfig{}
var retry *retryConfig
if o.retry != nil {
// merge the options with the existing retry
retry = o.retry
} else {
retry = &retryConfig{}
}
for _, opt := range opts {
opt.apply(retry)
}
Expand Down Expand Up @@ -1897,6 +1906,27 @@ type retryConfig struct {
shouldRetry func(err error) bool
}

func (r *retryConfig) clone() *retryConfig {
if r == nil {
return nil
}

var bo *gax.Backoff
if r.backoff != nil {
bo = &gax.Backoff{
Initial: r.backoff.Initial,
Max: r.backoff.Max,
Multiplier: r.backoff.Multiplier,
}
}

return &retryConfig{
backoff: bo,
policy: r.policy,
shouldRetry: r.shouldRetry,
}
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
// that modifyCall searches for by name.
type composeSourceObj struct {
Expand Down

0 comments on commit b2b5476

Please sign in to comment.