Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Types for resource paths and topic/subscription configs #3026

Merged
merged 5 commits into from Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
264 changes: 264 additions & 0 deletions pubsublite/config.go
@@ -0,0 +1,264 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"fmt"
"time"

"cloud.google.com/go/internal/optional"
"github.com/golang/protobuf/ptypes"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
fmpb "google.golang.org/genproto/protobuf/field_mask"
)

// TopicConfig describes the properties of a Google Pub/Sub Lite topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how topics are configured.
type TopicConfig struct {
// The full path of a topic.
Name TopicPath
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it worth inlining all of these structs that are facades of resource names would be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Unlike most other services, Pub/Sub Lite resources are zonal, hence topics are globally uniquely identified by {project, zone, id} and it's common to have to copy all components across to different objects, function args, etc. I think keeping the fields together as a full path is convenient and less error prone.


// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
PartitionCount int64
tmdiep marked this conversation as resolved.
Show resolved Hide resolved

// Publish throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 16.
PublishCapacityMiBPerSec int32
tmdiep marked this conversation as resolved.
Show resolved Hide resolved

// Subscribe throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 32.
SubscribeCapacityMiBPerSec int32

// The provisioned storage, in bytes, per partition. If the number of bytes
// stored in any of the topic's partitions grows beyond this value, older
// messages will be dropped to make room for newer ones, regardless of the
// value of `RetentionDuration`.
PerPartitionBytes int64

// How long a published message is retained. If unset, messages will be
// retained as long as the bytes retained for each partition is below
// `PerPartitionBytes`.
RetentionDuration optional.Duration
}

func (tc *TopicConfig) toProto() *pb.Topic {
topicpb := &pb.Topic{
Name: tc.Name.String(),
PartitionConfig: &pb.Topic_PartitionConfig{
Count: tc.PartitionCount,
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
},
},
},
RetentionConfig: &pb.Topic_RetentionConfig{
PerPartitionBytes: tc.PerPartitionBytes,
},
}
if tc.RetentionDuration != nil {
duration := optional.ToDuration(tc.RetentionDuration)
if duration >= 0 {
topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
}
}
return topicpb
}

func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
name, err := ParseTopicPath(t.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
}

partitionCfg := t.GetPartitionConfig()
retentionCfg := t.GetRetentionConfig()
topic := &TopicConfig{
Name: name,
PartitionCount: partitionCfg.GetCount(),
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
}
// An unset retention period proto denotes "infinite retention".
if retentionCfg.Period != nil {
period, err := ptypes.Duration(retentionCfg.Period)
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid retention period in topic config: %v", err)
}
topic.RetentionDuration = period
}
return topic, nil
}

// InfiniteRetention is sentinel used when updating topic configs to clear a
// retention duration (i.e. retain messages as long as there is available
// storage).
const InfiniteRetention = time.Duration(-1)

// TopicConfigToUpdate specifies the properties to update for a topic.
type TopicConfigToUpdate struct {
// The full path of the topic to update. Required.
Name TopicPath

// If non-zero, will update the publish throughput capacity per partition.
PublishCapacityMiBPerSec int32

// If non-zero, will update the subscribe throughput capacity per partition.
SubscribeCapacityMiBPerSec int32

// If non-zero, will update the provisioned storage per partition.
PerPartitionBytes int64

// If specified, will update how long a published message is retained. To
// clear a retention duration (i.e. retain messages as long as there is
// available storage), set this to `pubsublite.InfiniteRetention`.
RetentionDuration optional.Duration
}

func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
updatedTopic := &pb.Topic{
Name: tc.Name.String(),
PartitionConfig: &pb.Topic_PartitionConfig{
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
},
},
},
RetentionConfig: &pb.Topic_RetentionConfig{
PerPartitionBytes: tc.PerPartitionBytes,
},
}

var fields []string
if tc.PublishCapacityMiBPerSec > 0 {
fields = append(fields, "partition_config.capacity.publish_mib_per_sec")
}
if tc.SubscribeCapacityMiBPerSec > 0 {
fields = append(fields, "partition_config.capacity.subscribe_mib_per_sec")
}
if tc.PerPartitionBytes > 0 {
fields = append(fields, "retention_config.per_partition_bytes")
}
if tc.RetentionDuration != nil {
fields = append(fields, "retention_config.period")
duration := optional.ToDuration(tc.RetentionDuration)
// An unset retention period proto denotes "infinite retention".
if duration >= 0 {
updatedTopic.RetentionConfig.Period = ptypes.DurationProto(duration)
}
}

return &pb.UpdateTopicRequest{
Topic: updatedTopic,
UpdateMask: &fmpb.FieldMask{Paths: fields},
}
}

// DeliveryRequirement specifies when a subscription should send messages to
// subscribers relative to persistence in storage.
type DeliveryRequirement int32

const (
// UnspecifiedDeliveryRequirement represents and unset delivery requirement.
UnspecifiedDeliveryRequirement = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVERY_REQUIREMENT_UNSPECIFIED)

// DeliverImmediately means the server will not not wait for a published
// message to be successfully written to storage before delivering it to
// subscribers.
DeliverImmediately = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY)

// DeliverAfterStored means the server will not deliver a published message to
// subscribers until the message has been successfully written to storage.
// This will result in higher end-to-end latency, but consistent delivery.
DeliverAfterStored = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED)
)

// SubscriptionConfig describes the properties of a Google Pub/Sub Lite
// subscription, which is attached to a topic.
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more
// information about how subscriptions are configured.
type SubscriptionConfig struct {
// The full path of a subscription.
Name SubscriptionPath

// The name of the topic this subscription is attached to. This cannot be
// changed after creation.
Topic TopicPath

// Whether a message should be delivered to subscribers immediately after it
// has been published or after it has been successfully written to storage.
DeliveryRequirement DeliveryRequirement
}

func (sc *SubscriptionConfig) toProto() *pb.Subscription {
return &pb.Subscription{
Name: sc.Name.String(),
Topic: sc.Topic.String(),
DeliveryConfig: &pb.Subscription_DeliveryConfig{
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement),
},
}
}

func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
name, err := ParseSubscriptionPath(s.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
}
topic, err := ParseTopicPath(s.GetTopic())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
}
return &SubscriptionConfig{
Name: name,
Topic: topic,
DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()),
}, nil
}

// SubscriptionConfigToUpdate specifies the properties to update for a
// subscription.
type SubscriptionConfigToUpdate struct {
// The full path of the subscription to update. Required.
Name SubscriptionPath

// If non-zero, updates the message delivery requirement.
DeliveryRequirement DeliveryRequirement
}

func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest {
updatedSubs := &pb.Subscription{
Name: sc.Name.String(),
DeliveryConfig: &pb.Subscription_DeliveryConfig{
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement),
},
}

var fields []string
if sc.DeliveryRequirement > 0 {
fields = append(fields, "delivery_config.delivery_requirement")
}

return &pb.UpdateSubscriptionRequest{
Subscription: updatedSubs,
UpdateMask: &fmpb.FieldMask{Paths: fields},
}
}