Skip to content

Commit

Permalink
feat(storage): add retry configurability (#5159)
Browse files Browse the repository at this point in the history
Adds configurability for how/whether the library will retry
operations. This PR is limited to ObjectHandle methods and
configuration only of backoff timing; however I plan on expanding
this to additional settings (using the same RetryOption framework)
and to all methods which make network calls (via adding Retryer
methods to BucketHandle, etc).
  • Loading branch information
tritone committed Nov 23, 2021
1 parent 899ffbf commit a12641a
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 3 deletions.
22 changes: 22 additions & 0 deletions storage/invoke.go
Expand Up @@ -27,6 +27,25 @@ import (
"google.golang.org/grpc/status"
)

// run determines whether a retry is necessary based on the config and
// idempotency information. It then calls the function with or without retries
// as appropriate, using the configured settings.
func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error {
if !isIdempotent {
return call()
}
bo := gax.Backoff{}
if retry != nil && retry.backoff != nil {
bo.Multiplier = retry.backoff.Multiplier
bo.Initial = retry.backoff.Initial
bo.Max = retry.backoff.Max
}
return internal.Retry(ctx, bo, func() (stop bool, err error) {
err = call()
return !shouldRetry(err), err
})
}

// runWithRetry calls the function until it returns nil or a non-retryable error, or
// the context is done.
func runWithRetry(ctx context.Context, call func() error) error {
Expand All @@ -43,6 +62,9 @@ func runWithRetry(ctx context.Context, call func() error) error {
}

func shouldRetry(err error) bool {
if err == nil {
return false
}
if err == io.ErrUnexpectedEOF {
return true
}
Expand Down
51 changes: 48 additions & 3 deletions storage/storage.go
Expand Up @@ -41,6 +41,7 @@ import (
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gapic "cloud.google.com/go/storage/internal/apiv2"
"github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -852,6 +853,7 @@ type ObjectHandle struct {
encryptionKey []byte // AES-256 key
userProject string // for requester-pays buckets
readCompressed bool // Accept-Encoding: gzip
retry *retryConfig
}

// ACL provides access to the object's access control list.
Expand Down Expand Up @@ -915,7 +917,7 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error
}
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1016,7 +1018,8 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
}
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
// TODO: configure conditional idempotency.
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1080,7 +1083,8 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
}
// Encryption doesn't apply to Delete.
setClientHeader(call.Header())
err := runWithRetry(ctx, func() error { return call.Do() })
// TODO: configure conditional idempotency.
err := run(ctx, func() error { return call.Do() }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return ErrObjectNotExist
Expand Down Expand Up @@ -1775,6 +1779,47 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool
return true
}

// Retryer returns an object handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle {
o2 := *o
retry := &retryConfig{}
for _, opt := range opts {
opt.apply(retry)
}
o2.retry = retry
return &o2
}

// RetryOption allows users to configure non-default retry behavior for API
// calls made to GCS.
type RetryOption interface {
apply(config *retryConfig)
}

// WithBackoff allows configuration of the backoff timing used for retries.
// Available configuration options (Initial, Max and Multiplier) are described
// at https://pkg.go.dev/github.com/googleapis/gax-go/v2#Backoff. If any fields
// are not supplied by the user, gax default values will be used.
func WithBackoff(backoff gax.Backoff) RetryOption {
return &withBackoff{
backoff: backoff,
}
}

type withBackoff struct {
backoff gax.Backoff
}

func (wb *withBackoff) apply(config *retryConfig) {
config.backoff = &wb.backoff
}

type retryConfig struct {
backoff *gax.Backoff
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
// that modifyCall searches for by name.
type composeSourceObj struct {
Expand Down
53 changes: 53 additions & 0 deletions storage/storage_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
raw "google.golang.org/api/storage/v1"
Expand Down Expand Up @@ -785,6 +786,58 @@ func TestConditionErrors(t *testing.T) {
}
}

// Test that ObjectHandle.Retryer correctly configures the retry configuration
// in the ObjectHandle.
func TestRetryer(t *testing.T) {
testCases := []struct {
name string
call func(o *ObjectHandle) *ObjectHandle
want *retryConfig
}{
{
name: "all defaults",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer()
},
want: &retryConfig{},
},
{
name: "set all backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}))
},
want: &retryConfig{&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}},
},
{
name: "set some backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{&gax.Backoff{
Multiplier: 3,
}},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
o := tc.call(&ObjectHandle{})
if diff := cmp.Diff(o.retry, tc.want, cmp.AllowUnexported(retryConfig{}, gax.Backoff{})); diff != "" {
s.Fatalf("retry not configured correctly: %v", diff)
}
})
}
}

// Test object compose.
func TestObjectCompose(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit a12641a

Please sign in to comment.