From a12641a43de60ed123f9ac604e9017c83a9654df Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 23 Nov 2021 15:16:17 -0500 Subject: [PATCH] feat(storage): add retry configurability (#5159) Adds configurability for how/whether the library will retry operations. This PR is limited to ObjectHandle methods and configuration only of backoff timing; however I plan on expanding this to additional settings (using the same RetryOption framework) and to all methods which make network calls (via adding Retryer methods to BucketHandle, etc). --- storage/invoke.go | 22 +++++++++++++++++ storage/storage.go | 51 ++++++++++++++++++++++++++++++++++++--- storage/storage_test.go | 53 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/storage/invoke.go b/storage/invoke.go index 0c44c2db14c..bbefe14ce14 100644 --- a/storage/invoke.go +++ b/storage/invoke.go @@ -27,6 +27,25 @@ import ( "google.golang.org/grpc/status" ) +// run determines whether a retry is necessary based on the config and +// idempotency information. It then calls the function with or without retries +// as appropriate, using the configured settings. +func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error { + if !isIdempotent { + return call() + } + bo := gax.Backoff{} + if retry != nil && retry.backoff != nil { + bo.Multiplier = retry.backoff.Multiplier + bo.Initial = retry.backoff.Initial + bo.Max = retry.backoff.Max + } + return internal.Retry(ctx, bo, func() (stop bool, err error) { + err = call() + return !shouldRetry(err), err + }) +} + // runWithRetry calls the function until it returns nil or a non-retryable error, or // the context is done. func runWithRetry(ctx context.Context, call func() error) error { @@ -43,6 +62,9 @@ func runWithRetry(ctx context.Context, call func() error) error { } func shouldRetry(err error) bool { + if err == nil { + return false + } if err == io.ErrUnexpectedEOF { return true } diff --git a/storage/storage.go b/storage/storage.go index 8d6277be26e..7a21ae3d4c4 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -41,6 +41,7 @@ import ( "cloud.google.com/go/internal/trace" "cloud.google.com/go/internal/version" gapic "cloud.google.com/go/storage/internal/apiv2" + "github.com/googleapis/gax-go/v2" "golang.org/x/oauth2/google" "golang.org/x/xerrors" "google.golang.org/api/googleapi" @@ -852,6 +853,7 @@ type ObjectHandle struct { encryptionKey []byte // AES-256 key userProject string // for requester-pays buckets readCompressed bool // Accept-Encoding: gzip + retry *retryConfig } // ACL provides access to the object's access control list. @@ -915,7 +917,7 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error } var obj *raw.Object setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) + err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true) var e *googleapi.Error if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -1016,7 +1018,8 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) ( } var obj *raw.Object setClientHeader(call.Header()) - err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) + // TODO: configure conditional idempotency. + err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true) var e *googleapi.Error if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound { return nil, ErrObjectNotExist @@ -1080,7 +1083,8 @@ func (o *ObjectHandle) Delete(ctx context.Context) error { } // Encryption doesn't apply to Delete. setClientHeader(call.Header()) - err := runWithRetry(ctx, func() error { return call.Do() }) + // TODO: configure conditional idempotency. + err := run(ctx, func() error { return call.Do() }, o.retry, true) var e *googleapi.Error if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound { return ErrObjectNotExist @@ -1775,6 +1779,47 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool return true } +// Retryer returns an object handle that is configured with custom retry +// behavior as specified by the options that are passed to it. All operations +// on the new handle will use the customized retry configuration. +func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle { + o2 := *o + retry := &retryConfig{} + for _, opt := range opts { + opt.apply(retry) + } + o2.retry = retry + return &o2 +} + +// RetryOption allows users to configure non-default retry behavior for API +// calls made to GCS. +type RetryOption interface { + apply(config *retryConfig) +} + +// WithBackoff allows configuration of the backoff timing used for retries. +// Available configuration options (Initial, Max and Multiplier) are described +// at https://pkg.go.dev/github.com/googleapis/gax-go/v2#Backoff. If any fields +// are not supplied by the user, gax default values will be used. +func WithBackoff(backoff gax.Backoff) RetryOption { + return &withBackoff{ + backoff: backoff, + } +} + +type withBackoff struct { + backoff gax.Backoff +} + +func (wb *withBackoff) apply(config *retryConfig) { + config.backoff = &wb.backoff +} + +type retryConfig struct { + backoff *gax.Backoff +} + // composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods // that modifyCall searches for by name. type composeSourceObj struct { diff --git a/storage/storage_test.go b/storage/storage_test.go index 2467b7b2ce6..0fa6a368b9f 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -37,6 +37,7 @@ import ( "cloud.google.com/go/iam" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" + "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" "google.golang.org/api/option" raw "google.golang.org/api/storage/v1" @@ -785,6 +786,58 @@ func TestConditionErrors(t *testing.T) { } } +// Test that ObjectHandle.Retryer correctly configures the retry configuration +// in the ObjectHandle. +func TestRetryer(t *testing.T) { + testCases := []struct { + name string + call func(o *ObjectHandle) *ObjectHandle + want *retryConfig + }{ + { + name: "all defaults", + call: func(o *ObjectHandle) *ObjectHandle { + return o.Retryer() + }, + want: &retryConfig{}, + }, + { + name: "set all backoff options", + call: func(o *ObjectHandle) *ObjectHandle { + return o.Retryer(WithBackoff(gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + })) + }, + want: &retryConfig{&gax.Backoff{ + Initial: 2 * time.Second, + Max: 30 * time.Second, + Multiplier: 3, + }}, + }, + { + name: "set some backoff options", + call: func(o *ObjectHandle) *ObjectHandle { + return o.Retryer(WithBackoff(gax.Backoff{ + Multiplier: 3, + })) + }, + want: &retryConfig{&gax.Backoff{ + Multiplier: 3, + }}, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(s *testing.T) { + o := tc.call(&ObjectHandle{}) + if diff := cmp.Diff(o.retry, tc.want, cmp.AllowUnexported(retryConfig{}, gax.Backoff{})); diff != "" { + s.Fatalf("retry not configured correctly: %v", diff) + } + }) + } +} + // Test object compose. func TestObjectCompose(t *testing.T) { t.Parallel()