Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add rpo (turbo replication) support #5003

Merged
merged 12 commits into from Jan 10, 2022
67 changes: 67 additions & 0 deletions storage/bucket.go
Expand Up @@ -524,6 +524,12 @@ type BucketAttrs struct {
// The project number of the project the bucket belongs to.
// This field is read-only.
ProjectNumber uint64

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO
}

// BucketPolicyOnly is an alias for UniformBucketLevelAccess.
Expand Down Expand Up @@ -791,6 +797,7 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) {
Etag: b.Etag,
LocationType: b.LocationType,
ProjectNumber: b.ProjectNumber,
RPO: toRPO(b),
}, nil
}

Expand Down Expand Up @@ -843,6 +850,7 @@ func (b *BucketAttrs) toRawBucket() *raw.Bucket {
Logging: b.Logging.toRawBucketLogging(),
Website: b.Website.toRawBucketWebsite(),
IamConfiguration: bktIAM,
Rpo: b.RPO.String(),
}
}

Expand Down Expand Up @@ -952,6 +960,12 @@ type BucketAttrsToUpdate struct {
// See https://cloud.google.com/storage/docs/json_api/v1/buckets/patch.
PredefinedDefaultObjectACL string

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO

setLabels map[string]string
deleteLabels map[string]bool
}
Expand Down Expand Up @@ -1064,7 +1078,10 @@ func (ua *BucketAttrsToUpdate) toRawBucket() *raw.Bucket {
rb.DefaultObjectAcl = nil
rb.ForceSendFields = append(rb.ForceSendFields, "DefaultObjectAcl")
}

rb.StorageClass = ua.StorageClass
rb.Rpo = ua.RPO.String()

if ua.setLabels != nil || ua.deleteLabels != nil {
rb.Labels = map[string]string{}
for k, v := range ua.setLabels {
Expand Down Expand Up @@ -1410,6 +1427,20 @@ func toPublicAccessPrevention(b *raw.BucketIamConfiguration) PublicAccessPrevent
}
}

func toRPO(b *raw.Bucket) RPO {
if b == nil {
return RPOUnknown
}
switch b.Rpo {
case rpoDefault:
return RPODefault
case rpoAsyncTurbo:
return RPOAsyncTurbo
default:
return RPOUnknown
}
}

// Objects returns an iterator over the objects in the bucket that match the
// Query q. If q is nil, no filtering is done. Objects will be iterated over
// lexicographically by name.
Expand Down Expand Up @@ -1624,3 +1655,39 @@ func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, e
}
return resp.NextPageToken, nil
}

// RPO (Recovery Point Objective) configures the turbo replication feature. See
// https://cloud.google.com/storage/docs/managing-turbo-replication for more information.
type RPO int
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved

const (
// RPOUnknown is a zero value. It may be returned from bucket.Attrs() if RPO
// is not present in the bucket metadata, that is, the bucket is not dual-region.
// This value is also used if the RPO field is not set in a call to GCS.
RPOUnknown RPO = iota

// RPODefault represents default replication. It is used to reset RPO on an
// existing bucket that has this field set to RPOAsyncTurbo. Otherwise it
// is equivalent to RPOUnknown, and is always ignored. This value is valid
// for dual- or multi-region buckets.
RPODefault

// RPOAsyncTurbo represents turbo replication and is used to enable Turbo
// Replication on a bucket. This value is only valid for dual-region buckets.
RPOAsyncTurbo

rpoUnknown string = ""
rpoDefault = "DEFAULT"
rpoAsyncTurbo = "ASYNC_TURBO"
)

func (rpo RPO) String() string {
switch rpo {
case RPODefault:
return rpoDefault
case RPOAsyncTurbo:
return rpoAsyncTurbo
default:
return rpoUnknown
}
}
10 changes: 10 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -45,6 +45,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
UniformBucketLevelAccess: UniformBucketLevelAccess{Enabled: true},
PublicAccessPrevention: PublicAccessPreventionEnforced,
VersioningEnabled: false,
RPO: RPOAsyncTurbo,
// should be ignored:
MetaGeneration: 39,
Created: time.Now(),
Expand Down Expand Up @@ -126,6 +127,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
PublicAccessPrevention: "enforced",
},
Versioning: nil, // ignore VersioningEnabled if false
Rpo: rpoAsyncTurbo,
Labels: map[string]string{"label": "value"},
Cors: []*raw.BucketCors{
{
Expand Down Expand Up @@ -279,6 +281,14 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
t.Errorf(msg)
}

// Test that setting RPO to default is propagated in the proto.
attrs.RPO = RPODefault
got = attrs.toRawBucket()
want.Rpo = rpoDefault
if msg := testutil.Diff(got, want); msg != "" {
t.Errorf(msg)
}

// Re-enable UBLA and confirm that it does not affect the PAP setting.
attrs.UniformBucketLevelAccess = UniformBucketLevelAccess{Enabled: true}
got = attrs.toRawBucket()
Expand Down