Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry config to BucketHandle #5170

Merged
merged 10 commits into from Dec 2, 2021
16 changes: 10 additions & 6 deletions storage/bucket.go
Expand Up @@ -96,7 +96,7 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck
if attrs != nil && attrs.PredefinedDefaultObjectACL != "" {
req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL)
}
return runWithRetry(ctx, func() error { _, err := req.Context(ctx).Do(); return err })
return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true)
}

// Delete deletes the Bucket.
Expand All @@ -108,7 +108,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) {
if err != nil {
return err
}
return runWithRetry(ctx, func() error { return req.Context(ctx).Do() })

return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true)
}

func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) {
Expand Down Expand Up @@ -157,6 +158,7 @@ func (b *BucketHandle) Object(name string) *ObjectHandle {
},
gen: -1,
userProject: b.userProject,
retry: b.retry.clone(),
}
}

Expand All @@ -170,10 +172,10 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error
return nil, err
}
var resp *raw.Bucket
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
resp, err = req.Context(ctx).Do()
return err
})
}, b.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrBucketNotExist
Expand Down Expand Up @@ -1090,10 +1092,10 @@ func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error {
metageneration = b.conds.MetagenerationMatch
}
req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration)
return runWithRetry(ctx, func() error {
return run(ctx, func() error {
_, err := req.Context(ctx).Do()
return err
})
}, b.retry, true)
}

// applyBucketConds modifies the provided call using the conditions in conds.
Expand Down Expand Up @@ -1379,6 +1381,8 @@ func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator {
// Retryer returns a bucket handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// Retry options set on a object handle will take precedence over options set on
// the bucket handle.
func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
b2 := *b
retry := &retryConfig{}
Expand Down
142 changes: 142 additions & 0 deletions storage/integration_test.go
Expand Up @@ -51,6 +51,7 @@ import (
"cloud.google.com/go/internal/uid"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -4448,6 +4449,147 @@ func findTestCredentials(ctx context.Context, envVar string, scopes ...string) (
return transport.Creds(ctx, opts...)
}

func TestIntegration_RetryConfig(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this test is a bit hacky since it depends on an unrealistic shouldRetry method. Do we really need an integration test? I feel like it's probably sufficient to just to write a unit test that verifies that the configs are what we expect them to be, if options are set on either or both of the BucketHandle and ObjectHandle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. switched it to unit test

ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()

bucket := client.Bucket(bucketName)

var numTries int
// with numTries = 2, this function sets the config to retry once (if at all)
shouldRetry := func(err error) bool {
numTries--
return numTries > 0
}

testCases := []struct {
name string
bucketOptions []RetryOption
objectOptions []RetryOption
shouldHaveTriesLeft bool
shouldBeQuickerThan time.Duration // backoff options should have a large difference in values so that this condition can be checked
}{
{
name: "object retryer is used",
objectOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
},
{
name: "bucket retryer is used",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
},
{
name: "object retryer overrides bucket retryer",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithPolicy(RetryNever),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: true,
},
{
name: "object retryer overrides bucket retryer backoff options",
bucketOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Minute,
Max: time.Hour,
Multiplier: 6,
}),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Nanosecond,
Max: time.Microsecond,
}),
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
},
shouldHaveTriesLeft: false,
shouldBeQuickerThan: time.Second,
},
{
name: "object retryer does not override bucket retryer if option is not set",
bucketOptions: []RetryOption{
WithPolicy(RetryNever),
WithErrorFunc(shouldRetry),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Initial: time.Nanosecond,
Max: time.Second,
}),
},
shouldHaveTriesLeft: true,
},
{
name: "object's backoff completely overwrites bucket's backoff",
bucketOptions: []RetryOption{
WithPolicy(RetryAlways),
WithErrorFunc(shouldRetry),
WithBackoff(gax.Backoff{
Initial: time.Hour,
}),
},
objectOptions: []RetryOption{
WithBackoff(gax.Backoff{
Multiplier: 2,
}),
},
shouldHaveTriesLeft: false,
shouldBeQuickerThan: time.Minute,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
numTries = 2

b := bucket
if len(tc.bucketOptions) > 0 {
b = b.Retryer(tc.bucketOptions...)
}

o := b.Object(uidSpace.New())
if len(tc.objectOptions) > 0 {
o = o.Retryer(tc.objectOptions...)
}

// set a timeout so that tests do not retry longer than expected
ctx, cancel := context.WithTimeout(ctx, tc.shouldBeQuickerThan+time.Second*5)
defer cancel()

start := time.Now()
_, err := o.Attrs(ctx) // object doesn't exist so this should always error
timeTaken := time.Since(start)
if err == nil {
t.Error("Object should not exist but call to attrs does not error")
}

if (numTries > 0 && !tc.shouldHaveTriesLeft) || (numTries <= 0 && tc.shouldHaveTriesLeft) {
t.Errorf("Unexpected number of retries; want tries left: %t, got tries left: %d", tc.shouldHaveTriesLeft, numTries)
}

if tc.shouldBeQuickerThan > 0 && timeTaken > tc.shouldBeQuickerThan {
t.Errorf("Retries took longer than expected; check backoff options.\n"+
"time taken (context may have been cancelled prior to full completion of retries): %s\nmax time expected to take: %s",
timeTaken, tc.shouldBeQuickerThan)
}
})
}
}

type testHelper struct {
t *testing.T
}
Expand Down
30 changes: 30 additions & 0 deletions storage/storage.go
Expand Up @@ -1790,9 +1790,16 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool
// Retryer returns an object handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
// These retry options will merge with the bucket's retryer (if set) for the
// returned handle. Options passed into this method will take precedence over
// options on the bucket's retryer.
func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle {
o2 := *o
retry := &retryConfig{}
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
if o.retry != nil {
// this will merge the options with the existing retry
retry = o.retry
}
for _, opt := range opts {
opt.apply(retry)
}
Expand Down Expand Up @@ -1897,6 +1904,29 @@ type retryConfig struct {
shouldRetry func(err error) bool
}

func (r *retryConfig) clone() *retryConfig {
if r == nil {
return nil
}

var bo *gax.Backoff
if r.backoff == nil {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
bo = nil
} else {
bo = &gax.Backoff{
Initial: r.backoff.Initial,
Max: r.backoff.Max,
Multiplier: r.backoff.Multiplier,
}
}

return &retryConfig{
backoff: bo,
policy: r.policy,
shouldRetry: r.shouldRetry,
}
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
// that modifyCall searches for by name.
type composeSourceObj struct {
Expand Down