diff --git a/pubsublite/config.go b/pubsublite/config.go new file mode 100644 index 00000000000..4225900de4c --- /dev/null +++ b/pubsublite/config.go @@ -0,0 +1,264 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "fmt" + "time" + + "cloud.google.com/go/internal/optional" + "github.com/golang/protobuf/ptypes" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + fmpb "google.golang.org/genproto/protobuf/field_mask" +) + +// TopicConfig describes the properties of a Google Pub/Sub Lite topic. +// See https://cloud.google.com/pubsub/lite/docs/topics for more information +// about how topics are configured. +type TopicConfig struct { + // The full path of a topic. + Name TopicPath + + // The number of partitions in the topic. Must be at least 1. Cannot be + // changed after creation. + PartitionCount int64 + + // Publish throughput capacity per partition in MiB/s. + // Must be >= 4 and <= 16. + PublishCapacityMiBPerSec int32 + + // Subscribe throughput capacity per partition in MiB/s. + // Must be >= 4 and <= 32. + SubscribeCapacityMiBPerSec int32 + + // The provisioned storage, in bytes, per partition. If the number of bytes + // stored in any of the topic's partitions grows beyond this value, older + // messages will be dropped to make room for newer ones, regardless of the + // value of `RetentionDuration`. + PerPartitionBytes int64 + + // How long a published message is retained. If unset, messages will be + // retained as long as the bytes retained for each partition is below + // `PerPartitionBytes`. + RetentionDuration optional.Duration +} + +func (tc *TopicConfig) toProto() *pb.Topic { + topicpb := &pb.Topic{ + Name: tc.Name.String(), + PartitionConfig: &pb.Topic_PartitionConfig{ + Count: tc.PartitionCount, + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{ + PublishMibPerSec: tc.PublishCapacityMiBPerSec, + SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, + }, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{ + PerPartitionBytes: tc.PerPartitionBytes, + }, + } + if tc.RetentionDuration != nil { + duration := optional.ToDuration(tc.RetentionDuration) + if duration >= 0 { + topicpb.RetentionConfig.Period = ptypes.DurationProto(duration) + } + } + return topicpb +} + +func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { + name, err := ParseTopicPath(t.GetName()) + if err != nil { + return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName()) + } + + partitionCfg := t.GetPartitionConfig() + retentionCfg := t.GetRetentionConfig() + topic := &TopicConfig{ + Name: name, + PartitionCount: partitionCfg.GetCount(), + PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(), + SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(), + PerPartitionBytes: retentionCfg.GetPerPartitionBytes(), + } + // An unset retention period proto denotes "infinite retention". + if retentionCfg.Period != nil { + period, err := ptypes.Duration(retentionCfg.Period) + if err != nil { + return nil, fmt.Errorf("pubsublite: invalid retention period in topic config: %v", err) + } + topic.RetentionDuration = period + } + return topic, nil +} + +// InfiniteRetention is sentinel used when updating topic configs to clear a +// retention duration (i.e. retain messages as long as there is available +// storage). +const InfiniteRetention = time.Duration(-1) + +// TopicConfigToUpdate specifies the properties to update for a topic. +type TopicConfigToUpdate struct { + // The full path of the topic to update. Required. + Name TopicPath + + // If non-zero, will update the publish throughput capacity per partition. + PublishCapacityMiBPerSec int32 + + // If non-zero, will update the subscribe throughput capacity per partition. + SubscribeCapacityMiBPerSec int32 + + // If non-zero, will update the provisioned storage per partition. + PerPartitionBytes int64 + + // If specified, will update how long a published message is retained. To + // clear a retention duration (i.e. retain messages as long as there is + // available storage), set this to `pubsublite.InfiniteRetention`. + RetentionDuration optional.Duration +} + +func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { + updatedTopic := &pb.Topic{ + Name: tc.Name.String(), + PartitionConfig: &pb.Topic_PartitionConfig{ + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{ + PublishMibPerSec: tc.PublishCapacityMiBPerSec, + SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, + }, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{ + PerPartitionBytes: tc.PerPartitionBytes, + }, + } + + var fields []string + if tc.PublishCapacityMiBPerSec > 0 { + fields = append(fields, "partition_config.capacity.publish_mib_per_sec") + } + if tc.SubscribeCapacityMiBPerSec > 0 { + fields = append(fields, "partition_config.capacity.subscribe_mib_per_sec") + } + if tc.PerPartitionBytes > 0 { + fields = append(fields, "retention_config.per_partition_bytes") + } + if tc.RetentionDuration != nil { + fields = append(fields, "retention_config.period") + duration := optional.ToDuration(tc.RetentionDuration) + // An unset retention period proto denotes "infinite retention". + if duration >= 0 { + updatedTopic.RetentionConfig.Period = ptypes.DurationProto(duration) + } + } + + return &pb.UpdateTopicRequest{ + Topic: updatedTopic, + UpdateMask: &fmpb.FieldMask{Paths: fields}, + } +} + +// DeliveryRequirement specifies when a subscription should send messages to +// subscribers relative to persistence in storage. +type DeliveryRequirement int32 + +const ( + // UnspecifiedDeliveryRequirement represents and unset delivery requirement. + UnspecifiedDeliveryRequirement = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVERY_REQUIREMENT_UNSPECIFIED) + + // DeliverImmediately means the server will not not wait for a published + // message to be successfully written to storage before delivering it to + // subscribers. + DeliverImmediately = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY) + + // DeliverAfterStored means the server will not deliver a published message to + // subscribers until the message has been successfully written to storage. + // This will result in higher end-to-end latency, but consistent delivery. + DeliverAfterStored = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED) +) + +// SubscriptionConfig describes the properties of a Google Pub/Sub Lite +// subscription, which is attached to a topic. +// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more +// information about how subscriptions are configured. +type SubscriptionConfig struct { + // The full path of a subscription. + Name SubscriptionPath + + // The name of the topic this subscription is attached to. This cannot be + // changed after creation. + Topic TopicPath + + // Whether a message should be delivered to subscribers immediately after it + // has been published or after it has been successfully written to storage. + DeliveryRequirement DeliveryRequirement +} + +func (sc *SubscriptionConfig) toProto() *pb.Subscription { + return &pb.Subscription{ + Name: sc.Name.String(), + Topic: sc.Topic.String(), + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), + }, + } +} + +func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) { + name, err := ParseSubscriptionPath(s.GetName()) + if err != nil { + return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName()) + } + topic, err := ParseTopicPath(s.GetTopic()) + if err != nil { + return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic()) + } + return &SubscriptionConfig{ + Name: name, + Topic: topic, + DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()), + }, nil +} + +// SubscriptionConfigToUpdate specifies the properties to update for a +// subscription. +type SubscriptionConfigToUpdate struct { + // The full path of the subscription to update. Required. + Name SubscriptionPath + + // If non-zero, updates the message delivery requirement. + DeliveryRequirement DeliveryRequirement +} + +func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest { + updatedSubs := &pb.Subscription{ + Name: sc.Name.String(), + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), + }, + } + + var fields []string + if sc.DeliveryRequirement > 0 { + fields = append(fields, "delivery_config.delivery_requirement") + } + + return &pb.UpdateSubscriptionRequest{ + Subscription: updatedSubs, + UpdateMask: &fmpb.FieldMask{Paths: fields}, + } +} diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go new file mode 100644 index 00000000000..b9d8ec80b81 --- /dev/null +++ b/pubsublite/config_test.go @@ -0,0 +1,341 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "github.com/golang/protobuf/proto" + + dpb "github.com/golang/protobuf/ptypes/duration" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + fmpb "google.golang.org/genproto/protobuf/field_mask" +) + +func TestTopicConfigToProtoConversion(t *testing.T) { + for _, tc := range []struct { + desc string + topicpb *pb.Topic + wantConfig *TopicConfig + wantErr bool + }{ + { + desc: "valid: retention duration set", + topicpb: &pb.Topic{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Count: 2, + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{ + PublishMibPerSec: 6, + SubscribeMibPerSec: 16, + }, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{ + PerPartitionBytes: 1073741824, + Period: &dpb.Duration{ + Seconds: 86400, + Nanos: 600, + }, + }, + }, + wantConfig: &TopicConfig{ + Name: TopicPath{ + Project: "my-proj", + Zone: "us-central1-c", + TopicID: "my-topic"}, + PartitionCount: 2, + PublishCapacityMiBPerSec: 6, + SubscribeCapacityMiBPerSec: 16, + PerPartitionBytes: 1073741824, + RetentionDuration: time.Duration(86400*1e9 + 600), + }, + }, + { + desc: "valid: retention duration unset", + topicpb: &pb.Topic{ + Name: "projects/my-proj/locations/europe-west1-b/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Count: 3, + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{ + PublishMibPerSec: 4, + SubscribeMibPerSec: 8, + }, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{ + PerPartitionBytes: 4294967296, + }, + }, + wantConfig: &TopicConfig{ + Name: TopicPath{ + Project: "my-proj", + Zone: "europe-west1-b", + TopicID: "my-topic", + }, + PartitionCount: 3, + PublishCapacityMiBPerSec: 4, + SubscribeCapacityMiBPerSec: 8, + PerPartitionBytes: 4294967296, + }, + }, + { + desc: "invalid: topic desc", + topicpb: &pb.Topic{ + Name: "invalid_topic_desc", + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotConfig, gotErr := protoToTopicConfig(tc.topicpb) + if !testutil.Equal(gotConfig, tc.wantConfig) || (gotErr != nil) != tc.wantErr { + t.Errorf("protoToTopicConfig(%v)\ngot (%v, %v)\nwant (%v, err=%v)", tc.topicpb, gotConfig, gotErr, tc.wantConfig, tc.wantErr) + } + + // Check that the config converts back to an identical proto. + if tc.wantConfig != nil { + if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.topicpb) { + t.Errorf("TopicConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.topicpb) + } + } + }) + } +} + +func TestTopicUpdateRequest(t *testing.T) { + for _, tc := range []struct { + desc string + config *TopicConfigToUpdate + want *pb.UpdateTopicRequest + }{ + { + desc: "all fields set", + config: &TopicConfigToUpdate{ + Name: TopicPath{ + Project: "my-proj", + Zone: "us-central1-c", + TopicID: "my-topic", + }, + PublishCapacityMiBPerSec: 4, + SubscribeCapacityMiBPerSec: 12, + PerPartitionBytes: 500000, + RetentionDuration: time.Duration(0), + }, + want: &pb.UpdateTopicRequest{ + Topic: &pb.Topic{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{ + PublishMibPerSec: 4, + SubscribeMibPerSec: 12, + }, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{ + PerPartitionBytes: 500000, + Period: &dpb.Duration{}, + }, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{ + "partition_config.capacity.publish_mib_per_sec", + "partition_config.capacity.subscribe_mib_per_sec", + "retention_config.per_partition_bytes", + "retention_config.period", + }, + }, + }, + }, + { + desc: "clear retention duration", + config: &TopicConfigToUpdate{ + Name: TopicPath{ + Project: "my-proj", + Zone: "us-central1-c", + TopicID: "my-topic", + }, + RetentionDuration: InfiniteRetention, + }, + want: &pb.UpdateTopicRequest{ + Topic: &pb.Topic{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{}, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{}, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{ + "retention_config.period", + }, + }, + }, + }, + { + desc: "no fields set", + config: &TopicConfigToUpdate{ + Name: TopicPath{ + Project: "my-proj", + Zone: "us-central1-c", + TopicID: "my-topic", + }, + }, + want: &pb.UpdateTopicRequest{ + Topic: &pb.Topic{ + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", + PartitionConfig: &pb.Topic_PartitionConfig{ + Dimension: &pb.Topic_PartitionConfig_Capacity_{ + Capacity: &pb.Topic_PartitionConfig_Capacity{}, + }, + }, + RetentionConfig: &pb.Topic_RetentionConfig{}, + }, + UpdateMask: &fmpb.FieldMask{}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) { + t.Errorf("TopicConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want) + } + }) + } +} + +func TestSubscriptionConfigToProtoConversion(t *testing.T) { + for _, tc := range []struct { + desc string + subspb *pb.Subscription + wantConfig *SubscriptionConfig + wantErr bool + }{ + { + desc: "valid", + subspb: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED, + }, + }, + wantConfig: &SubscriptionConfig{ + Name: SubscriptionPath{ + Project: "my-proj", + Zone: "us-central1-c", + SubscriptionID: "my-subs", + }, + Topic: TopicPath{ + Project: "my-proj", + Zone: "us-central1-c", + TopicID: "my-topic", + }, + DeliveryRequirement: DeliverAfterStored, + }, + }, + { + desc: "invalid: subscription desc", + subspb: &pb.Subscription{ + Name: "invalid_subscription_desc", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + }, + wantErr: true, + }, + { + desc: "invalid: topic desc", + subspb: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "invalid_topic_desc", + }, + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotConfig, gotErr := protoToSubscriptionConfig(tc.subspb) + if !testutil.Equal(gotConfig, tc.wantConfig) || (gotErr != nil) != tc.wantErr { + t.Errorf("protoToSubscriptionConfig(%v)\ngot (%v, %v)\nwant (%v, err=%v)", tc.subspb, gotConfig, gotErr, tc.wantConfig, tc.wantErr) + } + + // Check that the config converts back to an identical proto. + if tc.wantConfig != nil { + if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.subspb) { + t.Errorf("SubscriptionConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.subspb) + } + } + }) + } +} + +func TestSubscriptionUpdateRequest(t *testing.T) { + for _, tc := range []struct { + desc string + config *SubscriptionConfigToUpdate + want *pb.UpdateSubscriptionRequest + }{ + { + desc: "all fields set", + config: &SubscriptionConfigToUpdate{ + Name: SubscriptionPath{ + Project: "my-proj", + Zone: "us-central1-c", + SubscriptionID: "my-subs", + }, + DeliveryRequirement: DeliverImmediately, + }, + want: &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + DeliveryConfig: &pb.Subscription_DeliveryConfig{ + DeliveryRequirement: pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY, + }, + }, + UpdateMask: &fmpb.FieldMask{ + Paths: []string{ + "delivery_config.delivery_requirement", + }, + }, + }, + }, + { + desc: "no fields set", + config: &SubscriptionConfigToUpdate{ + Name: SubscriptionPath{ + Project: "my-proj", + Zone: "us-central1-c", + SubscriptionID: "my-subs", + }, + }, + want: &pb.UpdateSubscriptionRequest{ + Subscription: &pb.Subscription{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + DeliveryConfig: &pb.Subscription_DeliveryConfig{}, + }, + UpdateMask: &fmpb.FieldMask{}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if got := tc.config.toUpdateRequest(); !proto.Equal(got, tc.want) { + t.Errorf("SubscriptionConfigToUpdate: %v toUpdateRequest():\ngot: %v\nwant: %v", tc.config, got, tc.want) + } + }) + } +} diff --git a/pubsublite/doc.go b/pubsublite/doc.go new file mode 100644 index 00000000000..692ae9be2ad --- /dev/null +++ b/pubsublite/doc.go @@ -0,0 +1,24 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +/* +Package pubsublite provides an interface for publishing and receiving messages +using Google Pub/Sub Lite. + +More information about Google Pub/Sub Lite is available at +https://cloud.google.com/pubsub/lite. + +This library is under development and will not be stable until v1.0.0 has been +released. +*/ +package pubsublite // import "cloud.google.com/go/pubsublite" diff --git a/pubsublite/types.go b/pubsublite/types.go new file mode 100644 index 00000000000..a120c49452c --- /dev/null +++ b/pubsublite/types.go @@ -0,0 +1,143 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "fmt" + "regexp" + "strings" +) + +// LocationPath stores a path consisting of a project and zone. +type LocationPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone, for example "us-central1-a". + // See https://cloud.google.com/pubsub/lite/docs/locations for the list of + // zones where Google Pub/Sub Lite is available. + Zone string +} + +func (l LocationPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s", l.Project, l.Zone) +} + +// TopicPath stores the full path of a Google Pub/Sub Lite topic. +// See https://cloud.google.com/pubsub/lite/docs/topics for more information. +type TopicPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone, for example "us-central1-a". + // See https://cloud.google.com/pubsub/lite/docs/locations for the list of + // zones where Google Pub/Sub Lite is available. + Zone string + + // The ID of the Google Pub/Sub Lite topic, for example "my-topic-name". + TopicID string +} + +func (t TopicPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID) +} + +// Location returns the path of the parent location. +func (t TopicPath) Location() LocationPath { + return LocationPath{Project: t.Project, Zone: t.Zone} +} + +var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`) + +// ParseTopicPath parses the full path of a Google Pub/Sub Lite topic, which +// should have the format: `projects/{project}/locations/{zone}/topics/{id}`. +func ParseTopicPath(input string) (TopicPath, error) { + parts := topicPathRE.FindStringSubmatch(input) + if len(parts) < 4 { + return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input) + } + return TopicPath{Project: parts[1], Zone: parts[2], TopicID: parts[3]}, nil +} + +// SubscriptionPath stores the full path of a Google Pub/Sub Lite subscription. +// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more +// information. +type SubscriptionPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone. An example zone is "us-central1-a". + // See https://cloud.google.com/pubsub/lite/docs/locations for the list of + // zones where Google Pub/Sub Lite is available. + Zone string + + // The ID of the Google Pub/Sub Lite subscription, for example + // "my-subscription-name". + SubscriptionID string +} + +func (s SubscriptionPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID) +} + +// Location returns the path of the parent location. +func (s SubscriptionPath) Location() LocationPath { + return LocationPath{Project: s.Project, Zone: s.Zone} +} + +var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`) + +// ParseSubscriptionPath parses the full path of a Google Pub/Sub Lite +// subscription, which should have the format: +// `projects/{project}/locations/{zone}/subscriptions/{id}`. +func ParseSubscriptionPath(input string) (SubscriptionPath, error) { + parts := subsPathRE.FindStringSubmatch(input) + if len(parts) < 4 { + return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input) + } + return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil +} + +// ValidateZone verifies that the `input` string has the format of a valid +// Google Cloud zone. An example zone is "europe-west1-b". +// See https://cloud.google.com/compute/docs/regions-zones for more information. +func ValidateZone(input string) error { + parts := strings.Split(input, "-") + if len(parts) != 3 { + return fmt.Errorf("pubsublite: invalid zone %q", input) + } + return nil +} + +// ValidateRegion verifies that the `input` string has the format of a valid +// Google Cloud region. An example region is "europe-west1". +// See https://cloud.google.com/compute/docs/regions-zones for more information. +func ValidateRegion(input string) error { + parts := strings.Split(input, "-") + if len(parts) != 2 { + return fmt.Errorf("pubsublite: invalid region %q", input) + } + return nil +} + +// ZoneToRegion returns the region that the given zone is in. +func ZoneToRegion(zone string) (string, error) { + if err := ValidateZone(zone); err != nil { + return "", err + } + return zone[0:strings.LastIndex(zone, "-")], nil +} diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go new file mode 100644 index 00000000000..c9a98716434 --- /dev/null +++ b/pubsublite/types_test.go @@ -0,0 +1,225 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import "testing" + +func TestParseTopicPath(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantPath TopicPath + wantErr bool + }{ + { + desc: "valid: topic path", + input: "projects/987654321/locations/europe-west1-d/topics/my-topic", + wantPath: TopicPath{Project: "987654321", Zone: "europe-west1-d", TopicID: "my-topic"}, + }, + { + desc: "invalid: zone", + input: "europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: subscription path", + input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing project", + input: "projects//locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing zone", + input: "projects/987654321/locations//topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing topic id", + input: "projects/987654321/locations/europe-west1-d/topics/", + wantErr: true, + }, + { + desc: "invalid: has prefix", + input: "prefix/projects/987654321/locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: has suffix", + input: "projects/my-project/locations/us-west1-b/topics/my-topic/subresource/desc", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotPath, gotErr := ParseTopicPath(tc.input) + if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { + t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + } + }) + } +} + +func TestParseSubscriptionPath(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantPath SubscriptionPath + wantErr bool + }{ + { + desc: "valid: subscription path", + input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantPath: SubscriptionPath{Project: "987654321", Zone: "europe-west1-d", SubscriptionID: "my-subs"}, + }, + { + desc: "invalid: zone", + input: "europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: topic path", + input: "projects/987654321/locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing project", + input: "projects//locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing zone", + input: "projects/987654321/locations//subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing subscription id", + input: "projects/987654321/locations/europe-west1-d/subscriptions/", + wantErr: true, + }, + { + desc: "invalid: has prefix", + input: "prefix/projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: has suffix", + input: "projects/my-project/locations/us-west1-b/subscriptions/my-subs/subresource/desc", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotPath, gotErr := ParseSubscriptionPath(tc.input) + if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { + t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + } + }) + } +} + +func TestValidateZone(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantErr bool + }{ + { + desc: "valid", + input: "us-central1-a", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + input: "us-central1", + wantErr: true, + }, + { + desc: "invalid: excess dashes", + input: "us-central1-a-b", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := ValidateZone(tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + } + }) + } +} + +func TestValidateRegion(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantErr bool + }{ + { + desc: "valid", + input: "europe-west1", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + input: "europewest1", + wantErr: true, + }, + { + desc: "invalid: excess dashes", + input: "europe-west1-b", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := ValidateRegion(tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + } + }) + } +} + +func TestZoneToRegion(t *testing.T) { + for _, tc := range []struct { + desc string + zone string + wantRegion string + wantErr bool + }{ + { + desc: "valid", + zone: "europe-west1-d", + wantRegion: "europe-west1", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + zone: "europe-west1", + wantErr: true, + }, + { + desc: "invalid: no dashes", + zone: "europewest1", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotRegion, gotErr := ZoneToRegion(tc.zone) + if gotRegion != tc.wantRegion || (gotErr != nil) != tc.wantErr { + t.Errorf("ZoneToRegion(%q) = (%v, %v), want (%v, err=%v)", tc.zone, gotRegion, gotErr, tc.wantRegion, tc.wantErr) + } + }) + } +}