Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): add rpo (turbo replication) support #5003

Merged
merged 12 commits into from Jan 10, 2022
57 changes: 57 additions & 0 deletions storage/bucket.go
Expand Up @@ -461,6 +461,12 @@ type BucketAttrs struct {
// The project number of the project the bucket belongs to.
// This field is read-only.
ProjectNumber uint64

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO
}

// BucketPolicyOnly is an alias for UniformBucketLevelAccess.
Expand Down Expand Up @@ -728,6 +734,7 @@ func newBucket(b *raw.Bucket) (*BucketAttrs, error) {
Etag: b.Etag,
LocationType: b.LocationType,
ProjectNumber: b.ProjectNumber,
RPO: toRPO(b),
}, nil
}

Expand Down Expand Up @@ -780,6 +787,7 @@ func (b *BucketAttrs) toRawBucket() *raw.Bucket {
Logging: b.Logging.toRawBucketLogging(),
Website: b.Website.toRawBucketWebsite(),
IamConfiguration: bktIAM,
Rpo: b.RPO.String(),
}
}

Expand Down Expand Up @@ -889,6 +897,12 @@ type BucketAttrsToUpdate struct {
// See https://cloud.google.com/storage/docs/json_api/v1/buckets/patch.
PredefinedDefaultObjectACL string

// RPO configures the Recovery Point Objective (RPO) policy of the bucket.
// Set to RPOAsyncTurbo to turn on Turbo Replication for a bucket.
// See https://cloud.google.com/storage/docs/managing-turbo-replication for
// more information.
RPO RPO

setLabels map[string]string
deleteLabels map[string]bool
}
Expand Down Expand Up @@ -1001,7 +1015,10 @@ func (ua *BucketAttrsToUpdate) toRawBucket() *raw.Bucket {
rb.DefaultObjectAcl = nil
rb.ForceSendFields = append(rb.ForceSendFields, "DefaultObjectAcl")
}

rb.StorageClass = ua.StorageClass
rb.Rpo = ua.RPO.String()

if ua.setLabels != nil || ua.deleteLabels != nil {
rb.Labels = map[string]string{}
for k, v := range ua.setLabels {
Expand Down Expand Up @@ -1347,6 +1364,20 @@ func toPublicAccessPrevention(b *raw.BucketIamConfiguration) PublicAccessPrevent
}
}

func toRPO(b *raw.Bucket) RPO {
if b == nil {
return RPODefault
}
switch b.Rpo {
case rpoDefault:
return RPODefault
case rpoAsyncTurbo:
return RPOAsyncTurbo
default:
return RPODefault
}
}

// Objects returns an iterator over the objects in the bucket that match the
// Query q. If q is nil, no filtering is done. Objects will be iterated over
// lexicographically by name.
Expand Down Expand Up @@ -1534,3 +1565,29 @@ func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, e
}
return resp.NextPageToken, nil
}

type RPO int
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved

const (
// See: https://cloud.google.com/storage/docs/managing-turbo-replication

// RPODefault is used to reset RPO on an existing bucket with RPO set to RPOAsyncTurbo.
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// Otherwise RPODefault is equivalent to the RPO field being absent, and is always ignored
RPODefault RPO = iota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can RPO be set to default? If so, I think we'd need another unknown-type value to be the zero value. Zeros do not propagate through to the service generally. And I think we'd want to distinguish in bucket attrs between DEFAULT vs. an unknown value (if there is a new enum addition) and/or nothing being returned for that field.

See what I have for PublicAccessPrevention, I'm assuming this should probably operate the same way.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for some clarity, in the case of a single region bucket, RPO will not be present in the metadata. For a multiregion bucket, RPO will always come back as DEFAULT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good note, I've changed this as suggested.

// Set RPO to RPOAsyncTurbo to enable Turbo Replication on a bucket
RPOAsyncTurbo

rpoDefault string = "DEFAULT"
rpoAsyncTurbo = "ASYNC_TURBO"
)

func (rpo RPO) String() string {
switch rpo {
case RPODefault:
return rpoDefault
case RPOAsyncTurbo:
return rpoAsyncTurbo
default:
return rpoDefault
}
}
10 changes: 10 additions & 0 deletions storage/bucket_test.go
Expand Up @@ -44,6 +44,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
UniformBucketLevelAccess: UniformBucketLevelAccess{Enabled: true},
PublicAccessPrevention: PublicAccessPreventionEnforced,
VersioningEnabled: false,
RPO: RPOAsyncTurbo,
// should be ignored:
MetaGeneration: 39,
Created: time.Now(),
Expand Down Expand Up @@ -125,6 +126,7 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
PublicAccessPrevention: "enforced",
},
Versioning: nil, // ignore VersioningEnabled if false
Rpo: rpoAsyncTurbo,
Labels: map[string]string{"label": "value"},
Cors: []*raw.BucketCors{
{
Expand Down Expand Up @@ -278,6 +280,14 @@ func TestBucketAttrsToRawBucket(t *testing.T) {
t.Errorf(msg)
}

// Test that setting RPO to default is propagated in the proto.
attrs.RPO = RPODefault
got = attrs.toRawBucket()
want.Rpo = rpoDefault
if msg := testutil.Diff(got, want); msg != "" {
t.Errorf(msg)
}

// Re-enable UBLA and confirm that it does not affect the PAP setting.
attrs.UniformBucketLevelAccess = UniformBucketLevelAccess{Enabled: true}
got = attrs.toRawBucket()
Expand Down