Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry idempotency configs #5165

Merged
merged 2 commits into from Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 7 additions & 2 deletions storage/invoke.go
Expand Up @@ -27,15 +27,20 @@ import (
"google.golang.org/grpc/status"
)

var defaultRetry *retryConfig = &retryConfig{}

// run determines whether a retry is necessary based on the config and
// idempotency information. It then calls the function with or without retries
// as appropriate, using the configured settings.
func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error {
if !isIdempotent {
if retry == nil {
retry = defaultRetry
}
if (retry.policy == RetryIdempotent && !isIdempotent) || retry.policy == RetryNever {
return call()
}
bo := gax.Backoff{}
if retry != nil && retry.backoff != nil {
if retry.backoff != nil {
bo.Multiplier = retry.backoff.Multiplier
bo.Initial = retry.backoff.Initial
bo.Max = retry.backoff.Max
Expand Down
55 changes: 51 additions & 4 deletions storage/storage.go
Expand Up @@ -1018,8 +1018,11 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
}
var obj *raw.Object
setClientHeader(call.Header())
// TODO: configure conditional idempotency.
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var isIdempotent bool
if o.conds != nil && o.conds.MetagenerationMatch != 0 {
isIdempotent = true
}
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, isIdempotent)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1083,8 +1086,13 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
}
// Encryption doesn't apply to Delete.
setClientHeader(call.Header())
// TODO: configure conditional idempotency.
err := run(ctx, func() error { return call.Do() }, o.retry, true)
var isIdempotent bool
// Delete is idempotent if GenerationMatch or Generation have been passed in.
// The default generation is negative to get the latest version of the object.
if (o.conds != nil && o.conds.GenerationMatch != 0) || o.gen >= 0 {
isIdempotent = true
}
err := run(ctx, func() error { return call.Do() }, o.retry, isIdempotent)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return ErrObjectNotExist
Expand Down Expand Up @@ -1816,8 +1824,47 @@ func (wb *withBackoff) apply(config *retryConfig) {
config.backoff = &wb.backoff
}

// RetryPolicy describes the available policies for which operations should be
// retried. The default is `RetryIdempotent`.
type RetryPolicy int

const (
// RetryIdempotent causes only idempotent operations to be retried when the
// service returns a transient error. Using this policy, fully idempotent
// operations (such as `ObjectHandle.Attrs()`) will always be retried.
// Conditionally idempotent operations (for example `ObjectHandle.Update()`)
// will be retried only if the necessary conditions have been supplied (in
// the case of `ObjectHandle.Update()` this would mean supplying a
// `Conditions.MetagenerationMatch` condition is required).
RetryIdempotent RetryPolicy = iota

// RetryAlways causes all operations to be retried when the service returns a
// transient error, regardless of idempotency considerations.
RetryAlways

// RetryNever causes the client to not perform retries on failed operations.
RetryNever
)

// WithPolicy allows the configuration of which operations should be performed
// with retries for transient errors.
func WithPolicy(policy RetryPolicy) RetryOption {
return &withPolicy{
policy: policy,
}
}

type withPolicy struct {
policy RetryPolicy
}

func (ws *withPolicy) apply(config *retryConfig) {
config.policy = ws.policy
}

type retryConfig struct {
backoff *gax.Backoff
policy RetryPolicy
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
Expand Down
42 changes: 29 additions & 13 deletions storage/storage_test.go
Expand Up @@ -802,30 +802,46 @@ func TestRetryer(t *testing.T) {
want: &retryConfig{},
},
{
name: "set all backoff options",
name: "set all options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
return o.Retryer(
WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}),
WithPolicy(RetryAlways))
},
want: &retryConfig{
backoff: &gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}))
},
policy: RetryAlways,
},
want: &retryConfig{&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}},
},
{
name: "set some backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
return o.Retryer(
WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
Multiplier: 3,
}))
}},
},
{
name: "set policy only",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithPolicy(RetryNever))
},
want: &retryConfig{
policy: RetryNever,
},
want: &retryConfig{&gax.Backoff{
Multiplier: 3,
}},
},
}
for _, tc := range testCases {
Expand Down