From 3bd59958e0c06d2655b67fcb5410668db3c52af0 Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Mon, 10 Jan 2022 10:35:25 -0600 Subject: [PATCH] feat(storage): add rpo (turbo replication) support (#5003) --- storage/bucket.go | 67 ++++++++++++++++++++++++++++++++++++++++++ storage/bucket_test.go | 10 +++++++ 2 files changed, 77 insertions(+) diff --git a/storage/bucket.go b/storage/bucket.go index b564f58e919..9d145f03963 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -524,6 +524,12 @@ type BucketAttrs struct { // The project number of the project the bucket belongs to. // This field is read-only. ProjectNumber uint64 + + // RPO configures the Recovery Point Objective (RPO) policy of the bucket. + // Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket. + // See https://cloud.google.com/storage/docs/managing-turbo-replication for + // more information. + RPO RPO } // BucketPolicyOnly is an alias for UniformBucketLevelAccess. @@ -791,6 +797,7 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) { Etag: b.Etag, LocationType: b.LocationType, ProjectNumber: b.ProjectNumber, + RPO: toRPO(b), }, nil } @@ -843,6 +850,7 @@ func (b *BucketAttrs) toRawBucket() *raw.Bucket { Logging: b.Logging.toRawBucketLogging(), Website: b.Website.toRawBucketWebsite(), IamConfiguration: bktIAM, + Rpo: b.RPO.String(), } } @@ -952,6 +960,12 @@ type BucketAttrsToUpdate struct { // See https://cloud.google.com/storage/docs/json_api/v1/buckets/patch. PredefinedDefaultObjectACL string + // RPO configures the Recovery Point Objective (RPO) policy of the bucket. + // Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket. + // See https://cloud.google.com/storage/docs/managing-turbo-replication for + // more information. + RPO RPO + setLabels map[string]string deleteLabels map[string]bool } @@ -1064,7 +1078,10 @@ func (ua *BucketAttrsToUpdate) toRawBucket() *raw.Bucket { rb.DefaultObjectAcl = nil rb.ForceSendFields = append(rb.ForceSendFields, "DefaultObjectAcl") } + rb.StorageClass = ua.StorageClass + rb.Rpo = ua.RPO.String() + if ua.setLabels != nil || ua.deleteLabels != nil { rb.Labels = map[string]string{} for k, v := range ua.setLabels { @@ -1410,6 +1427,20 @@ func toPublicAccessPrevention(b *raw.BucketIamConfiguration) PublicAccessPrevent } } +func toRPO(b *raw.Bucket) RPO { + if b == nil { + return RPOUnknown + } + switch b.Rpo { + case rpoDefault: + return RPODefault + case rpoAsyncTurbo: + return RPOAsyncTurbo + default: + return RPOUnknown + } +} + // Objects returns an iterator over the objects in the bucket that match the // Query q. If q is nil, no filtering is done. Objects will be iterated over // lexicographically by name. @@ -1624,3 +1655,39 @@ func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, e } return resp.NextPageToken, nil } + +// RPO (Recovery Point Objective) configures the turbo replication feature. See +// https://cloud.google.com/storage/docs/managing-turbo-replication for more information. +type RPO int + +const ( + // RPOUnknown is a zero value. It may be returned from bucket.Attrs() if RPO + // is not present in the bucket metadata, that is, the bucket is not dual-region. + // This value is also used if the RPO field is not set in a call to GCS. + RPOUnknown RPO = iota + + // RPODefault represents default replication. It is used to reset RPO on an + // existing bucket that has this field set to RPOAsyncTurbo. Otherwise it + // is equivalent to RPOUnknown, and is always ignored. This value is valid + // for dual- or multi-region buckets. + RPODefault + + // RPOAsyncTurbo represents turbo replication and is used to enable Turbo + // Replication on a bucket. This value is only valid for dual-region buckets. + RPOAsyncTurbo + + rpoUnknown string = "" + rpoDefault = "DEFAULT" + rpoAsyncTurbo = "ASYNC_TURBO" +) + +func (rpo RPO) String() string { + switch rpo { + case RPODefault: + return rpoDefault + case RPOAsyncTurbo: + return rpoAsyncTurbo + default: + return rpoUnknown + } +} diff --git a/storage/bucket_test.go b/storage/bucket_test.go index 2b4491766fc..f1fc510bd2b 100644 --- a/storage/bucket_test.go +++ b/storage/bucket_test.go @@ -45,6 +45,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) { UniformBucketLevelAccess: UniformBucketLevelAccess{Enabled: true}, PublicAccessPrevention: PublicAccessPreventionEnforced, VersioningEnabled: false, + RPO: RPOAsyncTurbo, // should be ignored: MetaGeneration: 39, Created: time.Now(), @@ -126,6 +127,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) { PublicAccessPrevention: "enforced", }, Versioning: nil, // ignore VersioningEnabled if false + Rpo: rpoAsyncTurbo, Labels: map[string]string{"label": "value"}, Cors: []*raw.BucketCors{ { @@ -279,6 +281,14 @@ func TestBucketAttrsToRawBucket(t *testing.T) { t.Errorf(msg) } + // Test that setting RPO to default is propagated in the proto. + attrs.RPO = RPODefault + got = attrs.toRawBucket() + want.Rpo = rpoDefault + if msg := testutil.Diff(got, want); msg != "" { + t.Errorf(msg) + } + // Re-enable UBLA and confirm that it does not affect the PAP setting. attrs.UniformBucketLevelAccess = UniformBucketLevelAccess{Enabled: true} got = attrs.toRawBucket()