New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(pubsublite): Types for resource paths and topic/subscription configs #3026
Merged
Merged
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
cdda18a
feat(pubsublite): Types for resource paths and topic/subscription con…
tmdiep 4ebbca0
Fix comment for exported constants
tmdiep 030bcf2
Address review comments
tmdiep cc45f14
Fix accidental deletion
tmdiep 1eccee2
Merge branch 'master' into config
tmdiep File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
// Copyright 2020 Google LLC | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
|
||
package pubsublite | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"cloud.google.com/go/internal/optional" | ||
"github.com/golang/protobuf/ptypes" | ||
|
||
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" | ||
fmpb "google.golang.org/genproto/protobuf/field_mask" | ||
) | ||
|
||
// TopicConfig describes the properties of a Google Pub/Sub Lite topic. | ||
// See https://cloud.google.com/pubsub/lite/docs/topics for more information | ||
// about how topics are configured. | ||
type TopicConfig struct { | ||
// The full path of a topic. | ||
Name TopicPath | ||
|
||
// The number of partitions in the topic. Must be at least 1. Cannot be | ||
// changed after creation. | ||
PartitionCount int64 | ||
tmdiep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Publish throughput capacity per partition in MiB/s. | ||
// Must be >= 4 and <= 16. | ||
PublishCapacityMiBPerSec int32 | ||
tmdiep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Subscribe throughput capacity per partition in MiB/s. | ||
// Must be >= 4 and <= 32. | ||
SubscribeCapacityMiBPerSec int32 | ||
|
||
// The provisioned storage, in bytes, per partition. If the number of bytes | ||
// stored in any of the topic's partitions grows beyond this value, older | ||
// messages will be dropped to make room for newer ones, regardless of the | ||
// value of `RetentionDuration`. | ||
PerPartitionBytes int64 | ||
|
||
// How long a published message is retained. If unset, messages will be | ||
// retained as long as the bytes retained for each partition is below | ||
// `PerPartitionBytes`. | ||
RetentionDuration optional.Duration | ||
} | ||
|
||
func (tc *TopicConfig) toProto() *pb.Topic { | ||
topicpb := &pb.Topic{ | ||
Name: tc.Name.String(), | ||
PartitionConfig: &pb.Topic_PartitionConfig{ | ||
Count: tc.PartitionCount, | ||
Dimension: &pb.Topic_PartitionConfig_Capacity_{ | ||
Capacity: &pb.Topic_PartitionConfig_Capacity{ | ||
PublishMibPerSec: tc.PublishCapacityMiBPerSec, | ||
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, | ||
}, | ||
}, | ||
}, | ||
RetentionConfig: &pb.Topic_RetentionConfig{ | ||
PerPartitionBytes: tc.PerPartitionBytes, | ||
}, | ||
} | ||
if tc.RetentionDuration != nil { | ||
duration := optional.ToDuration(tc.RetentionDuration) | ||
if duration >= 0 { | ||
topicpb.RetentionConfig.Period = ptypes.DurationProto(duration) | ||
} | ||
} | ||
return topicpb | ||
} | ||
|
||
func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { | ||
name, err := ParseTopicPath(t.GetName()) | ||
if err != nil { | ||
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName()) | ||
} | ||
|
||
partitionCfg := t.GetPartitionConfig() | ||
retentionCfg := t.GetRetentionConfig() | ||
topic := &TopicConfig{ | ||
Name: name, | ||
PartitionCount: partitionCfg.GetCount(), | ||
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(), | ||
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(), | ||
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(), | ||
} | ||
// An unset retention period proto denotes "infinite retention". | ||
if retentionCfg.Period != nil { | ||
period, err := ptypes.Duration(retentionCfg.Period) | ||
if err != nil { | ||
return nil, fmt.Errorf("pubsublite: invalid retention period in topic config: %v", err) | ||
} | ||
topic.RetentionDuration = period | ||
} | ||
return topic, nil | ||
} | ||
|
||
// InfiniteRetention is sentinel used when updating topic configs to clear a | ||
// retention duration (i.e. retain messages as long as there is available | ||
// storage). | ||
const InfiniteRetention = time.Duration(-1) | ||
|
||
// TopicConfigToUpdate specifies the properties to update for a topic. | ||
type TopicConfigToUpdate struct { | ||
// The full path of the topic to update. Required. | ||
Name TopicPath | ||
|
||
// If non-zero, will update the publish throughput capacity per partition. | ||
PublishCapacityMiBPerSec int32 | ||
|
||
// If non-zero, will update the subscribe throughput capacity per partition. | ||
SubscribeCapacityMiBPerSec int32 | ||
|
||
// If non-zero, will update the provisioned storage per partition. | ||
PerPartitionBytes int64 | ||
|
||
// If specified, will update how long a published message is retained. To | ||
// clear a retention duration (i.e. retain messages as long as there is | ||
// available storage), set this to `pubsublite.InfiniteRetention`. | ||
RetentionDuration optional.Duration | ||
} | ||
|
||
func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { | ||
updatedTopic := &pb.Topic{ | ||
Name: tc.Name.String(), | ||
PartitionConfig: &pb.Topic_PartitionConfig{ | ||
Dimension: &pb.Topic_PartitionConfig_Capacity_{ | ||
Capacity: &pb.Topic_PartitionConfig_Capacity{ | ||
PublishMibPerSec: tc.PublishCapacityMiBPerSec, | ||
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, | ||
}, | ||
}, | ||
}, | ||
RetentionConfig: &pb.Topic_RetentionConfig{ | ||
PerPartitionBytes: tc.PerPartitionBytes, | ||
}, | ||
} | ||
|
||
var fields []string | ||
if tc.PublishCapacityMiBPerSec > 0 { | ||
fields = append(fields, "partition_config.capacity.publish_mib_per_sec") | ||
} | ||
if tc.SubscribeCapacityMiBPerSec > 0 { | ||
fields = append(fields, "partition_config.capacity.subscribe_mib_per_sec") | ||
} | ||
if tc.PerPartitionBytes > 0 { | ||
fields = append(fields, "retention_config.per_partition_bytes") | ||
} | ||
if tc.RetentionDuration != nil { | ||
fields = append(fields, "retention_config.period") | ||
duration := optional.ToDuration(tc.RetentionDuration) | ||
// An unset retention period proto denotes "infinite retention". | ||
if duration >= 0 { | ||
updatedTopic.RetentionConfig.Period = ptypes.DurationProto(duration) | ||
} | ||
} | ||
|
||
return &pb.UpdateTopicRequest{ | ||
Topic: updatedTopic, | ||
UpdateMask: &fmpb.FieldMask{Paths: fields}, | ||
} | ||
} | ||
|
||
// DeliveryRequirement specifies when a subscription should send messages to | ||
// subscribers relative to persistence in storage. | ||
type DeliveryRequirement int32 | ||
|
||
const ( | ||
// UnspecifiedDeliveryRequirement represents and unset delivery requirement. | ||
UnspecifiedDeliveryRequirement = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVERY_REQUIREMENT_UNSPECIFIED) | ||
|
||
// DeliverImmediately means the server will not not wait for a published | ||
// message to be successfully written to storage before delivering it to | ||
// subscribers. | ||
DeliverImmediately = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY) | ||
|
||
// DeliverAfterStored means the server will not deliver a published message to | ||
// subscribers until the message has been successfully written to storage. | ||
// This will result in higher end-to-end latency, but consistent delivery. | ||
DeliverAfterStored = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED) | ||
) | ||
|
||
// SubscriptionConfig describes the properties of a Google Pub/Sub Lite | ||
// subscription, which is attached to a topic. | ||
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more | ||
// information about how subscriptions are configured. | ||
type SubscriptionConfig struct { | ||
// The full path of a subscription. | ||
Name SubscriptionPath | ||
|
||
// The name of the topic this subscription is attached to. This cannot be | ||
// changed after creation. | ||
Topic TopicPath | ||
|
||
// Whether a message should be delivered to subscribers immediately after it | ||
// has been published or after it has been successfully written to storage. | ||
DeliveryRequirement DeliveryRequirement | ||
} | ||
|
||
func (sc *SubscriptionConfig) toProto() *pb.Subscription { | ||
return &pb.Subscription{ | ||
Name: sc.Name.String(), | ||
Topic: sc.Topic.String(), | ||
DeliveryConfig: &pb.Subscription_DeliveryConfig{ | ||
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), | ||
}, | ||
} | ||
} | ||
|
||
func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) { | ||
name, err := ParseSubscriptionPath(s.GetName()) | ||
if err != nil { | ||
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName()) | ||
} | ||
topic, err := ParseTopicPath(s.GetTopic()) | ||
if err != nil { | ||
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic()) | ||
} | ||
return &SubscriptionConfig{ | ||
Name: name, | ||
Topic: topic, | ||
DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()), | ||
}, nil | ||
} | ||
|
||
// SubscriptionConfigToUpdate specifies the properties to update for a | ||
// subscription. | ||
type SubscriptionConfigToUpdate struct { | ||
// The full path of the subscription to update. Required. | ||
Name SubscriptionPath | ||
|
||
// If non-zero, updates the message delivery requirement. | ||
DeliveryRequirement DeliveryRequirement | ||
} | ||
|
||
func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest { | ||
updatedSubs := &pb.Subscription{ | ||
Name: sc.Name.String(), | ||
DeliveryConfig: &pb.Subscription_DeliveryConfig{ | ||
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), | ||
}, | ||
} | ||
|
||
var fields []string | ||
if sc.DeliveryRequirement > 0 { | ||
fields = append(fields, "delivery_config.delivery_requirement") | ||
} | ||
|
||
return &pb.UpdateSubscriptionRequest{ | ||
Subscription: updatedSubs, | ||
UpdateMask: &fmpb.FieldMask{Paths: fields}, | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it worth inlining all of these structs that are facades of resource names would be useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... Unlike most other services, Pub/Sub Lite resources are zonal, hence topics are globally uniquely identified by {project, zone, id} and it's common to have to copy all components across to different objects, function args, etc. I think keeping the fields together as a full path is convenient and less error prone.