Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add retry config to IAM handle and reader #5223

Merged
merged 4 commits into from Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion storage/emulator_test.sh
Expand Up @@ -56,7 +56,7 @@ function cleanup() {
trap cleanup EXIT

# TODO: move to passing once fixed
FAILING=( "buckets.setIamPolicy"
FAILING=(
"objects.insert"
)
# TODO: remove regex once all tests are passing
Expand All @@ -69,6 +69,7 @@ PASSING=( "buckets.list"
"buckets.update"
"buckets.patch"
"buckets.getIamPolicy"
"buckets.setIamPolicy"
"buckets.testIamPermissions"
"buckets.lockRetentionPolicy"
"objects.copy"
Expand Down
15 changes: 9 additions & 6 deletions storage/iam.go
Expand Up @@ -29,13 +29,15 @@ func (b *BucketHandle) IAM() *iam.Handle {
return iam.InternalNewHandleClient(&iamClient{
raw: b.c.raw,
userProject: b.userProject,
retry: b.retry,
}, b.name)
}

// iamClient implements the iam.client interface.
type iamClient struct {
raw *raw.Service
userProject string
retry *retryConfig
}

func (c *iamClient) Get(ctx context.Context, resource string) (p *iampb.Policy, err error) {
Expand All @@ -52,10 +54,10 @@ func (c *iamClient) GetWithVersion(ctx context.Context, resource string, request
call.UserProject(c.userProject)
}
var rp *raw.Policy
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
rp, err = call.Context(ctx).Do()
return err
})
}, c.retry, true)
if err != nil {
return nil, err
}
Expand All @@ -72,10 +74,11 @@ func (c *iamClient) Set(ctx context.Context, resource string, p *iampb.Policy) (
if c.userProject != "" {
call.UserProject(c.userProject)
}
return runWithRetry(ctx, func() error {
isIdempotent := len(p.Etag) > 0
return run(ctx, func() error {
_, err := call.Context(ctx).Do()
return err
})
}, c.retry, isIdempotent)
}

func (c *iamClient) Test(ctx context.Context, resource string, perms []string) (permissions []string, err error) {
Expand All @@ -88,10 +91,10 @@ func (c *iamClient) Test(ctx context.Context, resource string, perms []string) (
call.UserProject(c.userProject)
}
var res *raw.TestIamPermissionsResponse
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
res, err = call.Context(ctx).Do()
return err
})
}, c.retry, true)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions storage/reader.go
Expand Up @@ -163,7 +163,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
}

var res *http.Response
err = runWithRetry(ctx, func() error {
err = run(ctx, func() error {
res, err = o.c.hc.Do(req)
if err != nil {
return err
Expand Down Expand Up @@ -210,7 +210,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
gen = gen64
}
return nil
})
}, o.retry, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -483,7 +483,7 @@ func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, lengt
var msg *storagepb.ReadObjectResponse
var err error

err = runWithRetry(cc, func() error {
err = run(cc, func() error {
stream, err = o.c.gc.ReadObject(cc, req)
if err != nil {
return err
Expand All @@ -492,7 +492,7 @@ func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, lengt
msg, err = stream.Recv()

return err
})
}, o.retry, true)
if err != nil {
// Close the stream context we just created to ensure we don't leak
// resources.
Expand Down
7 changes: 4 additions & 3 deletions storage/retry_conformance_test.go
Expand Up @@ -243,10 +243,11 @@ var methods = map[string][]retryFunc{
return err
}

if err := bkt.IAM().SetPolicy(ctx, policy); err != nil {
return err
if !preconditions {
policy.InternalProto.Etag = nil
}
return fmt.Errorf("Etag preconditions not supported")

return bkt.IAM().SetPolicy(ctx, policy)
},
},
"storage.hmacKey.update": {
Expand Down