Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry configurability #5159

Merged
merged 5 commits into from Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions storage/invoke.go
Expand Up @@ -27,6 +27,31 @@ import (
"google.golang.org/grpc/status"
)

// run determines whether a retry is necessary based on the config and
// idempotency information. It then calls the function with or without retries
// as appropriate, using the configured settings.
func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error {
if !isIdempotent {
return call()
}
bo := gax.Backoff{}
if retry != nil && retry.backoff != nil {
bo.Multiplier = retry.backoff.Multiplier
bo.Initial = retry.backoff.Initial
bo.Max = retry.backoff.Max
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
}
return internal.Retry(ctx, bo, func() (stop bool, err error) {
err = call()
if err == nil {
return true, nil
}
if shouldRetry(err) {
return false, err
}
return true, err
tritone marked this conversation as resolved.
Show resolved Hide resolved
})
}

// runWithRetry calls the function until it returns nil or a non-retryable error, or
// the context is done.
func runWithRetry(ctx context.Context, call func() error) error {
Expand Down
51 changes: 48 additions & 3 deletions storage/storage.go
Expand Up @@ -41,6 +41,7 @@ import (
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
gapic "cloud.google.com/go/storage/internal/apiv2"
"github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
"golang.org/x/xerrors"
"google.golang.org/api/googleapi"
Expand Down Expand Up @@ -852,6 +853,7 @@ type ObjectHandle struct {
encryptionKey []byte // AES-256 key
userProject string // for requester-pays buckets
readCompressed bool // Accept-Encoding: gzip
retry *retryConfig
}

// ACL provides access to the object's access control list.
Expand Down Expand Up @@ -915,7 +917,7 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error
}
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1016,7 +1018,8 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
}
var obj *raw.Object
setClientHeader(call.Header())
err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err })
// TODO: configure conditional idempotency.
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1080,7 +1083,8 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
}
// Encryption doesn't apply to Delete.
setClientHeader(call.Header())
err := runWithRetry(ctx, func() error { return call.Do() })
// TODO: configure conditional idempotency.
err := run(ctx, func() error { return call.Do() }, o.retry, true)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return ErrObjectNotExist
Expand Down Expand Up @@ -1775,6 +1779,47 @@ func setConditionField(call reflect.Value, name string, value interface{}) bool
return true
}

// Retryer returns an object handle that is configured with custom retry
// behavior as specified by the options that are passed to it. All operations
// on the new handle will use the customized retry configuration.
func (o *ObjectHandle) Retryer(opts ...RetryOption) *ObjectHandle {
o2 := *o
retry := &retryConfig{}
for _, opt := range opts {
opt.apply(retry)
}
o2.retry = retry
return &o2
}

// RetryOption allows users to configure non-default retry behavior for API
// calls made to GCS.
type RetryOption interface {
apply(config *retryConfig)
}

// WithBackoff allows configuration of the backoff timing used for retries.
// Available configuration options (Initial, Max and Multiplier) are described
// at https://pkg.go.dev/github.com/googleapis/gax-go/v2#Backoff. If any fields
// are not supplied by the user, gax default values will be used.
func WithBackoff(backoff gax.Backoff) RetryOption {
return &withBackoff{
backoff: backoff,
}
}

type withBackoff struct {
backoff gax.Backoff
}

func (wb *withBackoff) apply(config *retryConfig) {
config.backoff = &wb.backoff
}

type retryConfig struct {
backoff *gax.Backoff
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
// that modifyCall searches for by name.
type composeSourceObj struct {
Expand Down
53 changes: 53 additions & 0 deletions storage/storage_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
raw "google.golang.org/api/storage/v1"
Expand Down Expand Up @@ -785,6 +786,58 @@ func TestConditionErrors(t *testing.T) {
}
}

// Test that ObjectHandle.Retryer correctly configures the retry configuration
// in the ObjectHandle.
func TestRetryer(t *testing.T) {
testCases := []struct {
name string
call func(o *ObjectHandle) *ObjectHandle
want *retryConfig
}{
{
name: "all defaults",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer()
},
want: &retryConfig{},
},
{
name: "set all backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}))
},
want: &retryConfig{&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}},
},
{
name: "set some backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{&gax.Backoff{
Multiplier: 3,
}},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(s *testing.T) {
o := tc.call(&ObjectHandle{})
if diff := cmp.Diff(o.retry, tc.want, cmp.AllowUnexported(retryConfig{}, gax.Backoff{})); diff != "" {
s.Fatalf("retry not configured correctly: %v", diff)
}
})
}
}

// Test object compose.
func TestObjectCompose(t *testing.T) {
t.Parallel()
Expand Down