Skip to content

Commit

Permalink
feat(storage): add rpo (turbo replication) support (#5003)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp committed Jan 10, 2022
1 parent 0c2722c commit 3bd5995
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 0 deletions.
67 changes: 67 additions & 0 deletions storage/bucket.go
Expand Up @@ -524,6 +524,12 @@ type BucketAttrs struct {
// The project number of the project the bucket belongs to.
// This field is read-only.
ProjectNumber uint64

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO
}

// BucketPolicyOnly is an alias for UniformBucketLevelAccess.
Expand Down Expand Up @@ -791,6 +797,7 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) {
Etag: b.Etag,
LocationType: b.LocationType,
ProjectNumber: b.ProjectNumber,
RPO: toRPO(b),
}, nil
}

Expand Down Expand Up @@ -843,6 +850,7 @@ func (b *BucketAttrs) toRawBucket() *raw.Bucket {
Logging: b.Logging.toRawBucketLogging(),
Website: b.Website.toRawBucketWebsite(),
IamConfiguration: bktIAM,
Rpo: b.RPO.String(),
}
}

Expand Down Expand Up @@ -952,6 +960,12 @@ type BucketAttrsToUpdate struct {
// See https://cloud.google.com/storage/docs/json_api/v1/buckets/patch.
PredefinedDefaultObjectACL string

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO

setLabels map[string]string
deleteLabels map[string]bool
}
Expand Down Expand Up @@ -1064,7 +1078,10 @@ func (ua *BucketAttrsToUpdate) toRawBucket() *raw.Bucket {
rb.DefaultObjectAcl = nil
rb.ForceSendFields = append(rb.ForceSendFields, "DefaultObjectAcl")
}

rb.StorageClass = ua.StorageClass
rb.Rpo = ua.RPO.String()

if ua.setLabels != nil || ua.deleteLabels != nil {
rb.Labels = map[string]string{}
for k, v := range ua.setLabels {
Expand Down Expand Up @@ -1410,6 +1427,20 @@ func toPublicAccessPrevention(b *raw.BucketIamConfiguration) PublicAccessPrevent
}
}

func toRPO(b *raw.Bucket) RPO {
if b == nil {
return RPOUnknown
}
switch b.Rpo {
case rpoDefault:
return RPODefault
case rpoAsyncTurbo:
return RPOAsyncTurbo
default:
return RPOUnknown
}
}

// Objects returns an iterator over the objects in the bucket that match the
// Query q. If q is nil, no filtering is done. Objects will be iterated over
// lexicographically by name.
Expand Down Expand Up @@ -1624,3 +1655,39 @@ func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, e
}
return resp.NextPageToken, nil
}

// RPO (Recovery Point Objective) configures the turbo replication feature. See
// https://cloud.google.com/storage/docs/managing-turbo-replication for more information.
type RPO int

const (
// RPOUnknown is a zero value. It may be returned from bucket.Attrs() if RPO
// is not present in the bucket metadata, that is, the bucket is not dual-region.
// This value is also used if the RPO field is not set in a call to GCS.
RPOUnknown RPO = iota

// RPODefault represents default replication. It is used to reset RPO on an
// existing bucket that has this field set to RPOAsyncTurbo. Otherwise it
// is equivalent to RPOUnknown, and is always ignored. This value is valid
// for dual- or multi-region buckets.
RPODefault

// RPOAsyncTurbo represents turbo replication and is used to enable Turbo
// Replication on a bucket. This value is only valid for dual-region buckets.
RPOAsyncTurbo

rpoUnknown string = ""
rpoDefault = "DEFAULT"
rpoAsyncTurbo = "ASYNC_TURBO"
)

func (rpo RPO) String() string {
switch rpo {
case RPODefault:
return rpoDefault
case RPOAsyncTurbo:
return rpoAsyncTurbo
default:
return rpoUnknown
}
}
10 changes: 10 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -45,6 +45,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
UniformBucketLevelAccess: UniformBucketLevelAccess{Enabled: true},
PublicAccessPrevention: PublicAccessPreventionEnforced,
VersioningEnabled: false,
RPO: RPOAsyncTurbo,
// should be ignored:
MetaGeneration: 39,
Created: time.Now(),
Expand Down Expand Up @@ -126,6 +127,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
PublicAccessPrevention: "enforced",
},
Versioning: nil, // ignore VersioningEnabled if false
Rpo: rpoAsyncTurbo,
Labels: map[string]string{"label": "value"},
Cors: []*raw.BucketCors{
{
Expand Down Expand Up @@ -279,6 +281,14 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
t.Errorf(msg)
}

// Test that setting RPO to default is propagated in the proto.
attrs.RPO = RPODefault
got = attrs.toRawBucket()
want.Rpo = rpoDefault
if msg := testutil.Diff(got, want); msg != "" {
t.Errorf(msg)
}

// Re-enable UBLA and confirm that it does not affect the PAP setting.
attrs.UniformBucketLevelAccess = UniformBucketLevelAccess{Enabled: true}
got = attrs.toRawBucket()
Expand Down

0 comments on commit 3bd5995

Please sign in to comment.