Skip to content

Commit

Permalink
feat(storage): add retry idempotency configs (#5165)
Browse files Browse the repository at this point in the history
Adds idempotency options to ObjectHandle.Retryer. This allows
users to choose whether only idempotent operations are retried
(default), all operations are retried, or nothing is retried.

In addition, change ObjectHandle.Update and ObjectHandle.Delete
to retry only when idempotency conditions are present by default.
  • Loading branch information
tritone committed Nov 23, 2021
1 parent a12641a commit fa5e458
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 19 deletions.
9 changes: 7 additions & 2 deletions storage/invoke.go
Expand Up @@ -27,15 +27,20 @@ import (
"google.golang.org/grpc/status"
)

var defaultRetry *retryConfig = &retryConfig{}

// run determines whether a retry is necessary based on the config and
// idempotency information. It then calls the function with or without retries
// as appropriate, using the configured settings.
func run(ctx context.Context, call func() error, retry *retryConfig, isIdempotent bool) error {
if !isIdempotent {
if retry == nil {
retry = defaultRetry
}
if (retry.policy == RetryIdempotent && !isIdempotent) || retry.policy == RetryNever {
return call()
}
bo := gax.Backoff{}
if retry != nil && retry.backoff != nil {
if retry.backoff != nil {
bo.Multiplier = retry.backoff.Multiplier
bo.Initial = retry.backoff.Initial
bo.Max = retry.backoff.Max
Expand Down
55 changes: 51 additions & 4 deletions storage/storage.go
Expand Up @@ -1018,8 +1018,11 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) (
}
var obj *raw.Object
setClientHeader(call.Header())
// TODO: configure conditional idempotency.
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true)
var isIdempotent bool
if o.conds != nil && o.conds.MetagenerationMatch != 0 {
isIdempotent = true
}
err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, isIdempotent)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return nil, ErrObjectNotExist
Expand Down Expand Up @@ -1083,8 +1086,13 @@ func (o *ObjectHandle) Delete(ctx context.Context) error {
}
// Encryption doesn't apply to Delete.
setClientHeader(call.Header())
// TODO: configure conditional idempotency.
err := run(ctx, func() error { return call.Do() }, o.retry, true)
var isIdempotent bool
// Delete is idempotent if GenerationMatch or Generation have been passed in.
// The default generation is negative to get the latest version of the object.
if (o.conds != nil && o.conds.GenerationMatch != 0) || o.gen >= 0 {
isIdempotent = true
}
err := run(ctx, func() error { return call.Do() }, o.retry, isIdempotent)
var e *googleapi.Error
if ok := xerrors.As(err, &e); ok && e.Code == http.StatusNotFound {
return ErrObjectNotExist
Expand Down Expand Up @@ -1816,8 +1824,47 @@ func (wb *withBackoff) apply(config *retryConfig) {
config.backoff = &wb.backoff
}

// RetryPolicy describes the available policies for which operations should be
// retried. The default is `RetryIdempotent`.
type RetryPolicy int

const (
// RetryIdempotent causes only idempotent operations to be retried when the
// service returns a transient error. Using this policy, fully idempotent
// operations (such as `ObjectHandle.Attrs()`) will always be retried.
// Conditionally idempotent operations (for example `ObjectHandle.Update()`)
// will be retried only if the necessary conditions have been supplied (in
// the case of `ObjectHandle.Update()` this would mean supplying a
// `Conditions.MetagenerationMatch` condition is required).
RetryIdempotent RetryPolicy = iota

// RetryAlways causes all operations to be retried when the service returns a
// transient error, regardless of idempotency considerations.
RetryAlways

// RetryNever causes the client to not perform retries on failed operations.
RetryNever
)

// WithPolicy allows the configuration of which operations should be performed
// with retries for transient errors.
func WithPolicy(policy RetryPolicy) RetryOption {
return &withPolicy{
policy: policy,
}
}

type withPolicy struct {
policy RetryPolicy
}

func (ws *withPolicy) apply(config *retryConfig) {
config.policy = ws.policy
}

type retryConfig struct {
backoff *gax.Backoff
policy RetryPolicy
}

// composeSourceObj wraps a *raw.ComposeRequestSourceObjects, but adds the methods
Expand Down
42 changes: 29 additions & 13 deletions storage/storage_test.go
Expand Up @@ -802,30 +802,46 @@ func TestRetryer(t *testing.T) {
want: &retryConfig{},
},
{
name: "set all backoff options",
name: "set all options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
return o.Retryer(
WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}),
WithPolicy(RetryAlways))
},
want: &retryConfig{
backoff: &gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}))
},
policy: RetryAlways,
},
want: &retryConfig{&gax.Backoff{
Initial: 2 * time.Second,
Max: 30 * time.Second,
Multiplier: 3,
}},
},
{
name: "set some backoff options",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithBackoff(gax.Backoff{
return o.Retryer(
WithBackoff(gax.Backoff{
Multiplier: 3,
}))
},
want: &retryConfig{
backoff: &gax.Backoff{
Multiplier: 3,
}))
}},
},
{
name: "set policy only",
call: func(o *ObjectHandle) *ObjectHandle {
return o.Retryer(WithPolicy(RetryNever))
},
want: &retryConfig{
policy: RetryNever,
},
want: &retryConfig{&gax.Backoff{
Multiplier: 3,
}},
},
}
for _, tc := range testCases {
Expand Down

0 comments on commit fa5e458

Please sign in to comment.