diff --git a/pubsublite/README.md b/pubsublite/README.md index 70867aa7cab..2d065e96317 100644 --- a/pubsublite/README.md +++ b/pubsublite/README.md @@ -14,7 +14,6 @@ ```go import ( "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/ps" ) ``` @@ -23,13 +22,9 @@ To publish messages to a topic: [snip]:# (publish) ```go -// Create a PublisherClient for topic1. +// Create a PublisherClient for topic1 in zone us-central1-b. // See https://cloud.google.com/pubsub/lite/docs/locations for available zones. -topic := pubsublite.TopicPath{ - Project: "project-id", - Zone: "us-central1-b", - TopicID: "topic1", -} +const topic = "projects/project-id/locations/us-central1-b/topics/topic1", publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) if err != nil { log.Fatal(err) @@ -52,12 +47,8 @@ To receive messages for a subscription: [snip]:# (subscribe) ```go -// Create a SubscriberClient for subscription1. -subscription := pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "us-central1-b", - SubscriptionID: "subscription1", -} +// Create a SubscriberClient for subscription1 in zone us-central1-b. +const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1" subscriber, err := ps.NewSubscriberClient(ctx, ps.DefaultReceiveSettings, subscription) if err != nil { log.Fatal(err) diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 6929f229bc6..0a58a7c7fbd 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -29,17 +29,18 @@ var ( errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription") ) -// AdminClient provides admin operations for Cloud Pub/Sub Lite resources -// within a Google Cloud region. An AdminClient may be shared by multiple -// goroutines. +// AdminClient provides admin operations for Pub/Sub Lite resources within a +// Google Cloud region. The zone component of resource paths must be within this +// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list +// of zones where Pub/Sub Lite is available. +// +// An AdminClient may be shared by multiple goroutines. type AdminClient struct { admin *vkit.AdminClient } -// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin -// operations for resources within a given region. -// See https://cloud.google.com/pubsub/lite/docs/locations for the list of -// regions and zones where Cloud Pub/Sub Lite is available. +// NewAdminClient creates a new Pub/Sub Lite client to perform admin operations +// for resources within a given region. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) { if err := wire.ValidateRegion(region); err != nil { return nil, err @@ -54,10 +55,14 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt // CreateTopic creates a new topic from the given config. If the topic already // exists an error will be returned. func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) { + topicPath, err := wire.ParseTopicPath(config.Name) + if err != nil { + return nil, err + } req := &pb.CreateTopicRequest{ - Parent: config.Name.location().String(), + Parent: topicPath.Location().String(), Topic: config.toProto(), - TopicId: config.Name.TopicID, + TopicId: topicPath.TopicID, } topicpb, err := ac.admin.CreateTopic(ctx, req) if err != nil { @@ -69,6 +74,9 @@ func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*To // UpdateTopic updates an existing topic from the given config and returns the // new topic config. UpdateTopic returns an error if no fields were modified. func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) { + if _, err := wire.ParseTopicPath(config.Name); err != nil { + return nil, err + } req := config.toUpdateRequest() if len(req.GetUpdateMask().GetPaths()) == 0 { return nil, errNoTopicFieldsUpdated @@ -80,23 +88,35 @@ func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpda return protoToTopicConfig(topicpb) } -// DeleteTopic deletes a topic. -func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error { - return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()}) +// DeleteTopic deletes a topic. A valid topic path has the format: +// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func (ac *AdminClient) DeleteTopic(ctx context.Context, topic string) error { + if _, err := wire.ParseTopicPath(topic); err != nil { + return err + } + return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic}) } -// Topic retrieves the configuration of a topic. -func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) { - topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()}) +// Topic retrieves the configuration of a topic. A valid topic path has the +// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func (ac *AdminClient) Topic(ctx context.Context, topic string) (*TopicConfig, error) { + if _, err := wire.ParseTopicPath(topic); err != nil { + return nil, err + } + topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic}) if err != nil { return nil, err } return protoToTopicConfig(topicpb) } -// TopicPartitions returns the number of partitions for a topic. -func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) { - partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()}) +// TopicPartitions returns the number of partitions for a topic. A valid topic +// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func (ac *AdminClient) TopicPartitions(ctx context.Context, topic string) (int, error) { + if _, err := wire.ParseTopicPath(topic); err != nil { + return 0, err + } + partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic}) if err != nil { return 0, err } @@ -104,38 +124,57 @@ func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (in } // TopicSubscriptions retrieves the list of subscription paths for a topic. -func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) *SubscriptionPathIterator { +// A valid topic path has the format: +// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic string) *SubscriptionPathIterator { + if _, err := wire.ParseTopicPath(topic); err != nil { + return &SubscriptionPathIterator{err: err} + } return &SubscriptionPathIterator{ - it: ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()}), + it: ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic}), } } // Topics retrieves the list of topic configs for a given project and zone. -func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator { +// A valid parent path has the format: "projects/PROJECT_ID/locations/ZONE". +func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator { + if _, err := wire.ParseLocationPath(parent); err != nil { + return &TopicIterator{err: err} + } return &TopicIterator{ - it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}), + it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: parent}), } } // CreateSubscription creates a new subscription from the given config. If the // subscription already exists an error will be returned. func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) { + subsPath, err := wire.ParseSubscriptionPath(config.Name) + if err != nil { + return nil, err + } + if _, err := wire.ParseTopicPath(config.Topic); err != nil { + return nil, err + } req := &pb.CreateSubscriptionRequest{ - Parent: config.Name.location().String(), + Parent: subsPath.Location().String(), Subscription: config.toProto(), - SubscriptionId: config.Name.SubscriptionID, + SubscriptionId: subsPath.SubscriptionID, } subspb, err := ac.admin.CreateSubscription(ctx, req) if err != nil { return nil, err } - return protoToSubscriptionConfig(subspb) + return protoToSubscriptionConfig(subspb), nil } // UpdateSubscription updates an existing subscription from the given config and // returns the new subscription config. UpdateSubscription returns an error if // no fields were modified. func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) { + if _, err := wire.ParseSubscriptionPath(config.Name); err != nil { + return nil, err + } req := config.toUpdateRequest() if len(req.GetUpdateMask().GetPaths()) == 0 { return nil, errNoSubscriptionFieldsUpdated @@ -144,28 +183,41 @@ func (ac *AdminClient) UpdateSubscription(ctx context.Context, config Subscripti if err != nil { return nil, err } - return protoToSubscriptionConfig(subspb) + return protoToSubscriptionConfig(subspb), nil } -// DeleteSubscription deletes a subscription. -func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error { - return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()}) +// DeleteSubscription deletes a subscription. A valid subscription path has the +// format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". +func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error { + if _, err := wire.ParseSubscriptionPath(subscription); err != nil { + return err + } + return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription}) } -// Subscription retrieves the configuration of a subscription. -func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) { - subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()}) +// Subscription retrieves the configuration of a subscription. A valid +// subscription name has the format: +// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". +func (ac *AdminClient) Subscription(ctx context.Context, subscription string) (*SubscriptionConfig, error) { + if _, err := wire.ParseSubscriptionPath(subscription); err != nil { + return nil, err + } + subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription}) if err != nil { return nil, err } - return protoToSubscriptionConfig(subspb) + return protoToSubscriptionConfig(subspb), nil } // Subscriptions retrieves the list of subscription configs for a given project -// and zone. -func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator { +// and zone. A valid parent path has the format: +// "projects/PROJECT_ID/locations/ZONE". +func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *SubscriptionIterator { + if _, err := wire.ParseLocationPath(parent); err != nil { + return &SubscriptionIterator{err: err} + } return &SubscriptionIterator{ - it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}), + it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: parent}), } } @@ -178,12 +230,16 @@ func (ac *AdminClient) Close() error { // TopicIterator is an iterator that returns a list of topic configs. type TopicIterator struct { - it *vkit.TopicIterator + it *vkit.TopicIterator + err error } // Next returns the next topic config. The second return value will be // iterator.Done if there are no more topic configs. func (t *TopicIterator) Next() (*TopicConfig, error) { + if t.err != nil { + return nil, t.err + } topicpb, err := t.it.Next() if err != nil { return nil, err @@ -194,31 +250,41 @@ func (t *TopicIterator) Next() (*TopicConfig, error) { // SubscriptionIterator is an iterator that returns a list of subscription // configs. type SubscriptionIterator struct { - it *vkit.SubscriptionIterator + it *vkit.SubscriptionIterator + err error } // Next returns the next subscription config. The second return value will be // iterator.Done if there are no more subscription configs. func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) { + if s.err != nil { + return nil, s.err + } subspb, err := s.it.Next() if err != nil { return nil, err } - return protoToSubscriptionConfig(subspb) + return protoToSubscriptionConfig(subspb), nil } // SubscriptionPathIterator is an iterator that returns a list of subscription // paths. type SubscriptionPathIterator struct { - it *vkit.StringIterator + it *vkit.StringIterator + err error } -// Next returns the next subscription path. The second return value will be -// iterator.Done if there are no more subscription paths. -func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) { +// Next returns the next subscription path, which has format: +// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". The +// second return value will be iterator.Done if there are no more subscription +// paths. +func (sp *SubscriptionPathIterator) Next() (string, error) { + if sp.err != nil { + return "", sp.err + } subsPath, err := sp.it.Next() if err != nil { - return SubscriptionPath{}, err + return "", err } - return parseSubscriptionPath(subsPath) + return subsPath, nil } diff --git a/pubsublite/admin_test.go b/pubsublite/admin_test.go index 864badf4261..8a63305073f 100644 --- a/pubsublite/admin_test.go +++ b/pubsublite/admin_test.go @@ -38,14 +38,14 @@ func TestAdminTopicCRUD(t *testing.T) { ctx := context.Background() // Inputs - topicPath := TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "my-topic"} + const topicPath = "projects/my-proj/locations/us-central1-a/topics/my-topic" topicConfig := TopicConfig{ Name: topicPath, PartitionCount: 2, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 4, PerPartitionBytes: 30 * gibi, - RetentionDuration: time.Duration(24 * time.Hour), + RetentionDuration: 24 * time.Hour, } updateConfig := TopicConfigToUpdate{ Name: topicPath, @@ -54,6 +54,9 @@ func TestAdminTopicCRUD(t *testing.T) { PerPartitionBytes: 40 * gibi, RetentionDuration: InfiniteRetention, } + emptyUpdateConfig := TopicConfigToUpdate{ + Name: topicPath, + } // Expected requests and fake responses wantCreateReq := &pb.CreateTopicRequest{ @@ -95,7 +98,7 @@ func TestAdminTopicCRUD(t *testing.T) { t.Errorf("UpdateTopic() got: %v\nwant: %v", gotConfig, topicConfig) } - if _, err := admin.UpdateTopic(ctx, TopicConfigToUpdate{}); !test.ErrorEqual(err, errNoTopicFieldsUpdated) { + if _, err := admin.UpdateTopic(ctx, emptyUpdateConfig); !test.ErrorEqual(err, errNoTopicFieldsUpdated) { t.Errorf("UpdateTopic() got err: (%v), want err: (%v)", err, errNoTopicFieldsUpdated) } @@ -120,9 +123,9 @@ func TestAdminListTopics(t *testing.T) { ctx := context.Background() // Inputs - locationPath := LocationPath{Project: "my-proj", Zone: "us-central1-a"} + const locationPath = "projects/my-proj/locations/us-central1-a" topicConfig1 := TopicConfig{ - Name: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic1"}, + Name: "projects/my-proj/locations/us-central1-a/topics/topic1", PartitionCount: 2, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 4, @@ -130,7 +133,7 @@ func TestAdminListTopics(t *testing.T) { RetentionDuration: 24 * time.Hour, } topicConfig2 := TopicConfig{ - Name: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic2"}, + Name: "projects/my-proj/locations/us-central1-a/topics/topic2", PartitionCount: 4, PublishCapacityMiBPerSec: 6, SubscribeCapacityMiBPerSec: 8, @@ -138,7 +141,7 @@ func TestAdminListTopics(t *testing.T) { RetentionDuration: InfiniteRetention, } topicConfig3 := TopicConfig{ - Name: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic3"}, + Name: "projects/my-proj/locations/us-central1-a/topics/topic3", PartitionCount: 3, PublishCapacityMiBPerSec: 8, SubscribeCapacityMiBPerSec: 12, @@ -193,15 +196,20 @@ func TestAdminListTopics(t *testing.T) { func TestAdminListTopicSubscriptions(t *testing.T) { ctx := context.Background() + // Inputs + const ( + topicPath = "projects/my-proj/locations/us-central1-a/topics/my-topic" + subscription1 = "projects/my-proj/locations/us-central1-a/subscriptions/subscription1" + subscription2 = "projects/my-proj/locations/us-central1-a/subscriptions/subscription2" + subscription3 = "projects/my-proj/locations/us-central1-a/subscriptions/subscription3" + ) + // Expected requests and fake responses wantListReq1 := &pb.ListTopicSubscriptionsRequest{ Name: "projects/my-proj/locations/us-central1-a/topics/my-topic", } listResp1 := &pb.ListTopicSubscriptionsResponse{ - Subscriptions: []string{ - "projects/my-proj/locations/us-central1-a/subscriptions/subscription1", - "projects/my-proj/locations/us-central1-a/subscriptions/subscription2", - }, + Subscriptions: []string{subscription1, subscription2}, NextPageToken: "next_token", } wantListReq2 := &pb.ListTopicSubscriptionsRequest{ @@ -209,9 +217,7 @@ func TestAdminListTopicSubscriptions(t *testing.T) { PageToken: "next_token", } listResp2 := &pb.ListTopicSubscriptionsResponse{ - Subscriptions: []string{ - "projects/my-proj/locations/us-central1-a/subscriptions/subscription3", - }, + Subscriptions: []string{subscription3}, } verifiers := test.NewVerifiers(t) @@ -222,13 +228,7 @@ func TestAdminListTopicSubscriptions(t *testing.T) { admin := newTestAdminClient(t) - // Inputs - topicPath := TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "my-topic"} - subscription1 := SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription1"} - subscription2 := SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription2"} - subscription3 := SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription3"} - - var gotSubscriptions []SubscriptionPath + var gotSubscriptions []string subsPathIt := admin.TopicSubscriptions(ctx, topicPath) for { subsPath, err := subsPathIt.Next() @@ -242,7 +242,7 @@ func TestAdminListTopicSubscriptions(t *testing.T) { } } - wantSubscriptions := []SubscriptionPath{subscription1, subscription2, subscription3} + wantSubscriptions := []string{subscription1, subscription2, subscription3} if !testutil.Equal(gotSubscriptions, wantSubscriptions) { t.Errorf("TopicSubscriptions() got: %v\nwant: %v", gotSubscriptions, wantSubscriptions) } @@ -252,8 +252,8 @@ func TestAdminSubscriptionCRUD(t *testing.T) { ctx := context.Background() // Inputs - topicPath := TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "my-subscription"} - subscriptionPath := SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "my-subscription"} + const topicPath = "projects/my-proj/locations/us-central1-a/topics/my-topic" + const subscriptionPath = "projects/my-proj/locations/us-central1-a/subscriptions/my-subscription" subscriptionConfig := SubscriptionConfig{ Name: subscriptionPath, Topic: topicPath, @@ -263,6 +263,9 @@ func TestAdminSubscriptionCRUD(t *testing.T) { Name: subscriptionPath, DeliveryRequirement: DeliverAfterStored, } + emptyUpdateConfig := SubscriptionConfigToUpdate{ + Name: subscriptionPath, + } // Expected requests and fake responses wantCreateReq := &pb.CreateSubscriptionRequest{ @@ -300,7 +303,7 @@ func TestAdminSubscriptionCRUD(t *testing.T) { t.Errorf("UpdateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig) } - if _, err := admin.UpdateSubscription(ctx, SubscriptionConfigToUpdate{}); !test.ErrorEqual(err, errNoSubscriptionFieldsUpdated) { + if _, err := admin.UpdateSubscription(ctx, emptyUpdateConfig); !test.ErrorEqual(err, errNoSubscriptionFieldsUpdated) { t.Errorf("UpdateSubscription() got err: (%v), want err: (%v)", err, errNoSubscriptionFieldsUpdated) } @@ -319,20 +322,20 @@ func TestAdminListSubscriptions(t *testing.T) { ctx := context.Background() // Inputs - locationPath := LocationPath{Project: "my-proj", Zone: "us-central1-a"} + const locationPath = "projects/my-proj/locations/us-central1-a" subscriptionConfig1 := SubscriptionConfig{ - Name: SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription1"}, - Topic: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic1"}, + Name: "projects/my-proj/locations/us-central1-a/subscriptions/subscription1", + Topic: "projects/my-proj/locations/us-central1-a/topics/topic1", DeliveryRequirement: DeliverImmediately, } subscriptionConfig2 := SubscriptionConfig{ - Name: SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription2"}, - Topic: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic2"}, + Name: "projects/my-proj/locations/us-central1-a/subscriptions/subscription2", + Topic: "projects/my-proj/locations/us-central1-a/topics/topic2", DeliveryRequirement: DeliverAfterStored, } subscriptionConfig3 := SubscriptionConfig{ - Name: SubscriptionPath{Project: "my-proj", Zone: "us-central1-a", SubscriptionID: "subscription3"}, - Topic: TopicPath{Project: "my-proj", Zone: "us-central1-a", TopicID: "topic3"}, + Name: "projects/my-proj/locations/us-central1-a/subscriptions/subscription3", + Topic: "projects/my-proj/locations/us-central1-a/topics/topic3", DeliveryRequirement: DeliverImmediately, } @@ -379,3 +382,44 @@ func TestAdminListSubscriptions(t *testing.T) { t.Errorf("Subscriptions() got: -, want: +\n%s", diff) } } + +func TestAdminValidateResourcePaths(t *testing.T) { + ctx := context.Background() + + // Note: no server requests expected. + verifiers := test.NewVerifiers(t) + mockServer.OnTestStart(verifiers) + defer mockServer.OnTestEnd() + + admin := newTestAdminClient(t) + defer admin.Close() + + if _, err := admin.Topic(ctx, "INVALID"); err == nil { + t.Errorf("Topic() should fail") + } + if _, err := admin.TopicPartitions(ctx, "INVALID"); err == nil { + t.Errorf("TopicPartitions() should fail") + } + if err := admin.DeleteTopic(ctx, "INVALID"); err == nil { + t.Errorf("DeleteTopic() should fail") + } + if _, err := admin.Subscription(ctx, "INVALID"); err == nil { + t.Errorf("Subscription() should fail") + } + if err := admin.DeleteSubscription(ctx, "INVALID"); err == nil { + t.Errorf("DeleteTopic() should fail") + } + + topicIt := admin.Topics(ctx, "INVALID") + if _, err := topicIt.Next(); err == nil { + t.Errorf("TopicIterator.Next() should fail") + } + subsPathIt := admin.TopicSubscriptions(ctx, "INVALID") + if _, err := subsPathIt.Next(); err == nil { + t.Errorf("SubscriptionPathIterator.Next() should fail") + } + subsIt := admin.Subscriptions(ctx, "INVALID") + if _, err := subsIt.Next(); err == nil { + t.Errorf("SubscriptionIterator.Next() should fail") + } +} diff --git a/pubsublite/config.go b/pubsublite/config.go index 621f1da444b..aba5584da87 100644 --- a/pubsublite/config.go +++ b/pubsublite/config.go @@ -29,12 +29,22 @@ import ( // storage). const InfiniteRetention = time.Duration(-1) -// TopicConfig describes the properties of a Google Pub/Sub Lite topic. +// TopicConfig describes the properties of a Pub/Sub Lite topic. // See https://cloud.google.com/pubsub/lite/docs/topics for more information // about how topics are configured. type TopicConfig struct { - // The full path of a topic. - Name TopicPath + // The full path of the topic, in the format: + // "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". + // + // - PROJECT_ID: The project ID (e.g. "my-project") or the project number + // (e.g. "987654321") can be provided. + // - ZONE: The Google Cloud zone (e.g. "us-central1-a") where the topic is + // located. See https://cloud.google.com/pubsub/lite/docs/locations for the + // list of zones where Pub/Sub Lite is available. + // - TOPIC_ID: The ID of the topic (e.g. "my-topic"). See + // https://cloud.google.com/pubsub/docs/admin#resource_names for information + // about valid topic IDs. + Name string // The number of partitions in the topic. Must be at least 1. Cannot be // changed after creation. @@ -62,7 +72,7 @@ type TopicConfig struct { func (tc *TopicConfig) toProto() *pb.Topic { topicpb := &pb.Topic{ - Name: tc.Name.String(), + Name: tc.Name, PartitionConfig: &pb.Topic_PartitionConfig{ Count: int64(tc.PartitionCount), Dimension: &pb.Topic_PartitionConfig_Capacity_{ @@ -83,15 +93,10 @@ func (tc *TopicConfig) toProto() *pb.Topic { } func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { - name, err := parseTopicPath(t.GetName()) - if err != nil { - return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName()) - } - partitionCfg := t.GetPartitionConfig() retentionCfg := t.GetRetentionConfig() topic := &TopicConfig{ - Name: name, + Name: t.GetName(), PartitionCount: int(partitionCfg.GetCount()), PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()), SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()), @@ -111,8 +116,9 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { // TopicConfigToUpdate specifies the properties to update for a topic. type TopicConfigToUpdate struct { - // The full path of the topic to update. Required. - Name TopicPath + // The full path of the topic to update, in the format: + // "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". Required. + Name string // If non-zero, will update the publish throughput capacity per partition. PublishCapacityMiBPerSec int @@ -131,7 +137,7 @@ type TopicConfigToUpdate struct { func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { updatedTopic := &pb.Topic{ - Name: tc.Name.String(), + Name: tc.Name, PartitionConfig: &pb.Topic_PartitionConfig{ Dimension: &pb.Topic_PartitionConfig_Capacity_{ Capacity: &pb.Topic_PartitionConfig_Capacity{ @@ -175,12 +181,11 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { type DeliveryRequirement int const ( - // UnspecifiedDeliveryRequirement represents and unset delivery requirement. + // UnspecifiedDeliveryRequirement represents an unset delivery requirement. UnspecifiedDeliveryRequirement DeliveryRequirement = iota - // DeliverImmediately means the server will not not wait for a published - // message to be successfully written to storage before delivering it to - // subscribers. + // DeliverImmediately means the server will not wait for a published message + // to be successfully written to storage before delivering it to subscribers. DeliverImmediately // DeliverAfterStored means the server will not deliver a published message to @@ -189,17 +194,27 @@ const ( DeliverAfterStored ) -// SubscriptionConfig describes the properties of a Google Pub/Sub Lite -// subscription, which is attached to a topic. +// SubscriptionConfig describes the properties of a Pub/Sub Lite subscription, +// which is attached to a Pub/Sub Lite topic. // See https://cloud.google.com/pubsub/lite/docs/subscriptions for more // information about how subscriptions are configured. type SubscriptionConfig struct { - // The full path of a subscription. - Name SubscriptionPath - - // The name of the topic this subscription is attached to. This cannot be + // The full path of the subscription, in the format: + // "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". + // + // - PROJECT_ID: The project ID (e.g. "my-project") or the project number + // (e.g. "987654321") can be provided. + // - ZONE: The Google Cloud zone (e.g. "us-central1-a") of the corresponding + // topic. + // - SUBSCRIPTION_ID: The ID of the subscription (e.g. "my-subscription"). See + // https://cloud.google.com/pubsub/docs/admin#resource_names for information + // about valid subscription IDs. + Name string + + // The path of the topic that this subscription is attached to, in the format: + // "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". This cannot be // changed after creation. - Topic TopicPath + Topic string // Whether a message should be delivered to subscribers immediately after it // has been published or after it has been successfully written to storage. @@ -207,38 +222,35 @@ type SubscriptionConfig struct { } func (sc *SubscriptionConfig) toProto() *pb.Subscription { - return &pb.Subscription{ - Name: sc.Name.String(), - Topic: sc.Topic.String(), - DeliveryConfig: &pb.Subscription_DeliveryConfig{ + subspb := &pb.Subscription{ + Name: sc.Name, + Topic: sc.Topic, + } + if sc.DeliveryRequirement > 0 { + subspb.DeliveryConfig = &pb.Subscription_DeliveryConfig{ // Note: Assumes DeliveryRequirement enum values match API proto. DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), - }, + } } + return subspb } -func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) { - name, err := parseSubscriptionPath(s.GetName()) - if err != nil { - return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName()) - } - topic, err := parseTopicPath(s.GetTopic()) - if err != nil { - return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic()) - } +func protoToSubscriptionConfig(s *pb.Subscription) *SubscriptionConfig { return &SubscriptionConfig{ - Name: name, - Topic: topic, + Name: s.GetName(), + Topic: s.GetTopic(), // Note: Assumes DeliveryRequirement enum values match API proto. DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()), - }, nil + } } // SubscriptionConfigToUpdate specifies the properties to update for a // subscription. type SubscriptionConfigToUpdate struct { - // The full path of the subscription to update. Required. - Name SubscriptionPath + // The full path of the subscription to update, in the format: + // "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". + // Required. + Name string // If non-zero, updates the message delivery requirement. DeliveryRequirement DeliveryRequirement @@ -246,7 +258,7 @@ type SubscriptionConfigToUpdate struct { func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest { updatedSubs := &pb.Subscription{ - Name: sc.Name.String(), + Name: sc.Name, DeliveryConfig: &pb.Subscription_DeliveryConfig{ // Note: Assumes DeliveryRequirement enum values match API proto. DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement), diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go index 9ec387b73d9..cc583e74529 100644 --- a/pubsublite/config_test.go +++ b/pubsublite/config_test.go @@ -18,7 +18,7 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/proto" dpb "github.com/golang/protobuf/ptypes/duration" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" @@ -30,10 +30,9 @@ func TestTopicConfigToProtoConversion(t *testing.T) { desc string topicpb *pb.Topic wantConfig *TopicConfig - wantErr bool }{ { - desc: "valid: retention duration set", + desc: "retention duration set", topicpb: &pb.Topic{ Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", PartitionConfig: &pb.Topic_PartitionConfig{ @@ -54,10 +53,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) { }, }, wantConfig: &TopicConfig{ - Name: TopicPath{ - Project: "my-proj", - Zone: "us-central1-c", - TopicID: "my-topic"}, + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", PartitionCount: 2, PublishCapacityMiBPerSec: 6, SubscribeCapacityMiBPerSec: 16, @@ -66,7 +62,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) { }, }, { - desc: "valid: retention duration unset", + desc: "retention duration unset", topicpb: &pb.Topic{ Name: "projects/my-proj/locations/europe-west1-b/topics/my-topic", PartitionConfig: &pb.Topic_PartitionConfig{ @@ -83,11 +79,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) { }, }, wantConfig: &TopicConfig{ - Name: TopicPath{ - Project: "my-proj", - Zone: "europe-west1-b", - TopicID: "my-topic", - }, + Name: "projects/my-proj/locations/europe-west1-b/topics/my-topic", PartitionCount: 3, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 8, @@ -95,25 +87,16 @@ func TestTopicConfigToProtoConversion(t *testing.T) { RetentionDuration: InfiniteRetention, }, }, - { - desc: "invalid: topic desc", - topicpb: &pb.Topic{ - Name: "invalid_topic_desc", - }, - wantErr: true, - }, } { t.Run(tc.desc, func(t *testing.T) { gotConfig, gotErr := protoToTopicConfig(tc.topicpb) - if !testutil.Equal(gotConfig, tc.wantConfig) || (gotErr != nil) != tc.wantErr { - t.Errorf("protoToTopicConfig(%v)\ngot (%v, %v)\nwant (%v, err=%v)", tc.topicpb, gotConfig, gotErr, tc.wantConfig, tc.wantErr) + if !testutil.Equal(gotConfig, tc.wantConfig) || gotErr != nil { + t.Errorf("protoToTopicConfig(%v)\ngot (%v, %v)\nwant (%v, nil)", tc.topicpb, gotConfig, gotErr, tc.wantConfig) } // Check that the config converts back to an identical proto. - if tc.wantConfig != nil { - if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.topicpb) { - t.Errorf("TopicConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.topicpb) - } + if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.topicpb) { + t.Errorf("TopicConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.topicpb) } }) } @@ -128,11 +111,7 @@ func TestTopicUpdateRequest(t *testing.T) { { desc: "all fields set", config: &TopicConfigToUpdate{ - Name: TopicPath{ - Project: "my-proj", - Zone: "us-central1-c", - TopicID: "my-topic", - }, + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 12, PerPartitionBytes: 500000, @@ -167,11 +146,7 @@ func TestTopicUpdateRequest(t *testing.T) { { desc: "clear retention duration", config: &TopicConfigToUpdate{ - Name: TopicPath{ - Project: "my-proj", - Zone: "us-central1-c", - TopicID: "my-topic", - }, + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", RetentionDuration: InfiniteRetention, }, want: &pb.UpdateTopicRequest{ @@ -194,11 +169,7 @@ func TestTopicUpdateRequest(t *testing.T) { { desc: "no fields set", config: &TopicConfigToUpdate{ - Name: TopicPath{ - Project: "my-proj", - Zone: "us-central1-c", - TopicID: "my-topic", - }, + Name: "projects/my-proj/locations/us-central1-c/topics/my-topic", }, want: &pb.UpdateTopicRequest{ Topic: &pb.Topic{ @@ -227,10 +198,9 @@ func TestSubscriptionConfigToProtoConversion(t *testing.T) { desc string subspb *pb.Subscription wantConfig *SubscriptionConfig - wantErr bool }{ { - desc: "valid", + desc: "with delivery config", subspb: &pb.Subscription{ Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", @@ -239,47 +209,33 @@ func TestSubscriptionConfigToProtoConversion(t *testing.T) { }, }, wantConfig: &SubscriptionConfig{ - Name: SubscriptionPath{ - Project: "my-proj", - Zone: "us-central1-c", - SubscriptionID: "my-subs", - }, - Topic: TopicPath{ - Project: "my-proj", - Zone: "us-central1-c", - TopicID: "my-topic", - }, + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", DeliveryRequirement: DeliverAfterStored, }, }, { - desc: "invalid: subscription desc", + desc: "missing delivery config", subspb: &pb.Subscription{ - Name: "invalid_subscription_desc", + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", }, - wantErr: true, - }, - { - desc: "invalid: topic desc", - subspb: &pb.Subscription{ - Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", - Topic: "invalid_topic_desc", + wantConfig: &SubscriptionConfig{ + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", + Topic: "projects/my-proj/locations/us-central1-c/topics/my-topic", + DeliveryRequirement: UnspecifiedDeliveryRequirement, }, - wantErr: true, }, } { t.Run(tc.desc, func(t *testing.T) { - gotConfig, gotErr := protoToSubscriptionConfig(tc.subspb) - if !testutil.Equal(gotConfig, tc.wantConfig) || (gotErr != nil) != tc.wantErr { - t.Errorf("protoToSubscriptionConfig(%v)\ngot (%v, %v)\nwant (%v, err=%v)", tc.subspb, gotConfig, gotErr, tc.wantConfig, tc.wantErr) + gotConfig := protoToSubscriptionConfig(tc.subspb) + if !testutil.Equal(gotConfig, tc.wantConfig) { + t.Errorf("protoToSubscriptionConfig(%v)\ngot: %v\nwant: %v", tc.subspb, gotConfig, tc.wantConfig) } // Check that the config converts back to an identical proto. - if tc.wantConfig != nil { - if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.subspb) { - t.Errorf("SubscriptionConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.subspb) - } + if gotProto := tc.wantConfig.toProto(); !proto.Equal(gotProto, tc.subspb) { + t.Errorf("SubscriptionConfig: %v toProto():\ngot: %v\nwant: %v", tc.wantConfig, gotProto, tc.subspb) } }) } @@ -294,11 +250,7 @@ func TestSubscriptionUpdateRequest(t *testing.T) { { desc: "all fields set", config: &SubscriptionConfigToUpdate{ - Name: SubscriptionPath{ - Project: "my-proj", - Zone: "us-central1-c", - SubscriptionID: "my-subs", - }, + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", DeliveryRequirement: DeliverImmediately, }, want: &pb.UpdateSubscriptionRequest{ @@ -318,11 +270,7 @@ func TestSubscriptionUpdateRequest(t *testing.T) { { desc: "no fields set", config: &SubscriptionConfigToUpdate{ - Name: SubscriptionPath{ - Project: "my-proj", - Zone: "us-central1-c", - SubscriptionID: "my-subs", - }, + Name: "projects/my-proj/locations/us-central1-c/subscriptions/my-subs", }, want: &pb.UpdateSubscriptionRequest{ Subscription: &pb.Subscription{ diff --git a/pubsublite/doc.go b/pubsublite/doc.go index 6a89814dead..51afb016953 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -13,23 +13,23 @@ /* Package pubsublite provides an easy way to publish and receive messages using -Google Cloud Pub/Sub Lite. +Google Pub/Sub Lite. -Google Cloud Pub/Sub is designed to provide reliable, many-to-many, asynchronous -messaging between applications. Publisher applications can send messages to a -topic and other applications can subscribe to that topic to receive the -messages. By decoupling senders and receivers, Google Cloud Pub/Sub allows +Google Pub/Sub services are designed to provide reliable, many-to-many, +asynchronous messaging between applications. Publisher applications can send +messages to a topic and other applications can subscribe to that topic to +receive the messages. By decoupling senders and receivers, Google Pub/Sub allows developers to communicate between independently written applications. -Compared to Google Cloud Pub/Sub, Pub/Sub Lite provides partitioned zonal data -storage with predefined throughput and storage capacity. Guidance on how to -choose between Google Cloud Pub/Sub and Pub/Sub Lite is available at +Compared to Cloud Pub/Sub, Pub/Sub Lite provides partitioned zonal data storage +with predefined throughput and storage capacity. Guidance on how to choose +between Cloud Pub/Sub and Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. -More information about Google Cloud Pub/Sub Lite is available at +More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite. -See https://godoc.org/cloud.google.com/go for authentication, timeouts, +See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package. Note: This library is in ALPHA. Backwards-incompatible changes may be made @@ -38,14 +38,9 @@ before stable v1.0.0 is released. Creating Topics -Messages are published to topics. Cloud Pub/Sub Lite topics may be created like -so (error handling omitted for brevity): +Messages are published to topics. Pub/Sub Lite topics may be created like so: - topicPath := pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - } + const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic" topicConfig := pubsublite.TopicConfig{ Name: topicPath, PartitionCount: 1, @@ -54,41 +49,59 @@ so (error handling omitted for brevity): PerPartitionBytes: 30 * 1024 * 1024 * 1024, // 30 GiB RetentionDuration: pubsublite.InfiniteRetention, } - - region, err := pubsublite.ZoneToRegion(topicPath.Zone) - adminClient, err := pubsublite.NewAdminClient(ctx, region) - _, err = adminClient.CreateTopic(ctx, topicConfig) + adminClient, err := pubsublite.NewAdminClient(ctx, "us-central1") + if err != nil { + // TODO: Handle error. + } + topic, err = adminClient.CreateTopic(ctx, topicConfig) + if err != nil { + // TODO: Handle error. + } See https://cloud.google.com/pubsub/lite/docs/topics for more information about -how Cloud Pub/Sub Lite topics are configured. +how Pub/Sub Lite topics are configured. + +See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions +and zones where Pub/Sub Lite is available. Publishing The pubsublite/ps subpackage contains clients for publishing and receiving messages, which have similar interfaces to their Topic and Subscription -counterparts in the pubsub package. Cloud Pub/Sub Lite uses gRPC streams -extensively for high throughput. For more differences, see -https://godoc.org/cloud.google.com/go/pubsublite/ps. +counterparts in the Cloud Pub/Sub library: +https://pkg.go.dev/cloud.google.com/go/pubsub. + +Pub/Sub Lite uses gRPC streams extensively for high throughput. For more +differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/ps. To publish messages to a topic, first create a PublisherClient: publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topicPath) + if err != nil { + // TODO: Handle error. + } Then call Publish: - // Note: result is a pubsub.PublishResult result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")}) Publish queues the message for publishing and returns immediately. When enough messages have accumulated, or enough time has elapsed, the batch of messages is -sent to the Cloud Pub/Sub Lite service. Thresholds for batching can be -configured in PublishSettings. +sent to the Pub/Sub Lite service. Thresholds for batching can be configured in +PublishSettings. -Publish returns a PublishResult, which behaves like a future: its Get method +Publish returns a PublishResult, which behaves like a future; its Get method blocks until the message has been sent (or has failed to be sent) to the -service. Once you've finishing publishing, call Stop to flush all messages to -the service and close gRPC streams: +service: + + id, err := result.Get(ctx) + if err != nil { + // TODO: Handle error. + } + +Once you've finishing publishing, call Stop to flush all messages to the service +and close gRPC streams: publisher.Stop() @@ -98,23 +111,22 @@ about publishing. Creating Subscriptions -To receive messages published to a topic, clients create subscriptions to the -topic. There may be more than one subscription per topic; each message that is +To receive messages published to a topic, create a subscription to the topic. +There may be more than one subscription per topic; each message that is published to the topic will be delivered to all of its subscriptions. -Cloud Pub/Sub Lite subscriptions may be created like so: +Pub/Sub Lite subscriptions may be created like so: - subscriptionPath := pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - } + const subscriptionPath = "projects/my-project/locations/us-central1-c/subscriptions/my-subscription" subscriptionConfig := pubsublite.SubscriptionConfig{ Name: subscriptionPath, Topic: topicPath, DeliveryRequirement: pubsublite.DeliverImmediately, } - _, err = admin.CreateSubscription(ctx, subscriptionConfig) + subscription, err = adminClient.CreateSubscription(ctx, subscriptionConfig) + if err != nil { + // TODO: Handle error. + } See https://cloud.google.com/pubsub/lite/docs/subscriptions for more information about how subscriptions are configured. @@ -134,7 +146,7 @@ Messages are then consumed from a subscription via callback. m.Ack() }) if err != nil { - // Handle error. + // TODO: Handle error. } The callback may be invoked concurrently by multiple goroutines (one per @@ -144,11 +156,11 @@ Receive, cancel its context: cancel() Clients must call pubsub.Message.Ack() or pubsub.Message.Nack() for every -message received. Cloud Pub/Sub Lite does not have ACK deadlines. Cloud Pub/Sub -Lite also does not actually have the concept of NACK. The default behavior -terminates the SubscriberClient. In Cloud Pub/Sub Lite, only a single subscriber -for a given subscription is connected to any partition at a time, and there is -no other client that may be able to handle messages. +message received. Pub/Sub Lite does not have ACK deadlines. Pub/Sub Lite also +does not actually have the concept of NACK. The default behavior terminates the +SubscriberClient. In Pub/Sub Lite, only a single subscriber for a given +subscription is connected to any partition at a time, and there is no other +client that may be able to handle messages. See https://cloud.google.com/pubsub/lite/docs/subscribing for more information about receiving messages. diff --git a/pubsublite/example_test.go b/pubsublite/example_test.go index 7ada9f90d0a..8ee45bc1da4 100644 --- a/pubsublite/example_test.go +++ b/pubsublite/example_test.go @@ -31,11 +31,7 @@ func ExampleAdminClient_CreateTopic() { const gib = 1 << 30 topicConfig := pubsublite.TopicConfig{ - Name: pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - }, + Name: "projects/my-project/locations/zone/topics/my-topic", PartitionCount: 2, // Must be at least 1. PublishCapacityMiBPerSec: 4, // Must be 4-16 MiB/s. SubscribeCapacityMiBPerSec: 8, // Must be 4-32 MiB/s. @@ -57,11 +53,7 @@ func ExampleAdminClient_UpdateTopic() { } updateConfig := pubsublite.TopicConfigToUpdate{ - Name: pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - }, + Name: "projects/my-project/locations/zone/topics/my-topic", PublishCapacityMiBPerSec: 8, SubscribeCapacityMiBPerSec: 16, // Garbage collect messages older than 24 hours. @@ -80,11 +72,7 @@ func ExampleAdminClient_DeleteTopic() { // TODO: Handle error. } - topic := pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - } + const topic = "projects/my-project/locations/zone/topics/my-topic" if err := admin.DeleteTopic(ctx, topic); err != nil { // TODO: Handle error. } @@ -98,8 +86,7 @@ func ExampleAdminClient_Topics() { } // List the configs of all topics in the given zone for the project. - location := pubsublite.LocationPath{Project: "project-id", Zone: "zone"} - it := admin.Topics(ctx, location) + it := admin.Topics(ctx, "projects/my-project/locations/zone") for { topicConfig, err := it.Next() if err == iterator.Done { @@ -120,11 +107,7 @@ func ExampleAdminClient_TopicSubscriptions() { } // List the paths of all subscriptions of a topic. - topic := pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - } + const topic = "projects/my-project/locations/zone/topics/my-topic" it := admin.TopicSubscriptions(ctx, topic) for { subscriptionPath, err := it.Next() @@ -146,16 +129,8 @@ func ExampleAdminClient_CreateSubscription() { } subscriptionConfig := pubsublite.SubscriptionConfig{ - Name: pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - }, - Topic: pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - }, + Name: "projects/my-project/locations/zone/subscriptions/my-subscription", + Topic: "projects/my-project/locations/zone/topics/my-topic", // Do not wait for a published message to be successfully written to storage // before delivering it to subscribers. DeliveryRequirement: pubsublite.DeliverImmediately, @@ -174,11 +149,7 @@ func ExampleAdminClient_UpdateSubscription() { } updateConfig := pubsublite.SubscriptionConfigToUpdate{ - Name: pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - }, + Name: "projects/my-project/locations/zone/subscriptions/my-subscription", // Deliver a published message to subscribers after it has been successfully // written to storage. DeliveryRequirement: pubsublite.DeliverAfterStored, @@ -196,11 +167,7 @@ func ExampleAdminClient_DeleteSubscription() { // TODO: Handle error. } - subscription := pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - } + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" if err := admin.DeleteSubscription(ctx, subscription); err != nil { // TODO: Handle error. } @@ -214,8 +181,7 @@ func ExampleAdminClient_Subscriptions() { } // List the configs of all subscriptions in the given zone for the project. - location := pubsublite.LocationPath{Project: "project-id", Zone: "zone"} - it := admin.Subscriptions(ctx, location) + it := admin.Subscriptions(ctx, "projects/my-project/locations/zone") for { subscriptionConfig, err := it.Next() if err == iterator.Done { diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go index ac82aab60fa..2c8b5e633a8 100644 --- a/pubsublite/integration_test.go +++ b/pubsublite/integration_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/uid" "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/internal/wire" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/api/iterator" @@ -37,12 +38,45 @@ var ( // The server returns topic and subscription configs with project numbers in // resource paths. These will not match a project id specified for integration // tests. - testCmpOptions = []cmp.Option{ - cmpopts.IgnoreFields(SubscriptionPath{}, "Project"), - cmpopts.IgnoreFields(TopicPath{}, "Project"), + pathCmpOptions = []cmp.Option{ + cmpopts.IgnoreFields(wire.TopicPath{}, "Project"), + cmpopts.IgnoreFields(wire.SubscriptionPath{}, "Project"), + } + configCmpOptions = []cmp.Option{ + cmp.Comparer(func(t1, t2 *TopicConfig) bool { + return cmp.Equal(t1, t2, cmpopts.IgnoreFields(TopicConfig{}, "Name")) && TopicPathsEqual(t1.Name, t2.Name) + }), + cmp.Comparer(func(s1, s2 *SubscriptionConfig) bool { + return cmp.Equal(s1, s2, cmpopts.IgnoreFields(SubscriptionConfig{}, "Name", "Topic")) && + TopicPathsEqual(s1.Topic, s2.Topic) && SubscriptionPathsEqual(s1.Name, s2.Name) + }), } ) +func TopicPathsEqual(topic1, topic2 string) bool { + tp1, err := wire.ParseTopicPath(topic1) + if err != nil { + return false + } + tp2, err := wire.ParseTopicPath(topic2) + if err != nil { + return false + } + return cmp.Equal(tp1, tp2, pathCmpOptions...) +} + +func SubscriptionPathsEqual(subscription1, subscription2 string) bool { + sp1, err := wire.ParseSubscriptionPath(subscription1) + if err != nil { + return false + } + sp2, err := wire.ParseSubscriptionPath(subscription2) + if err != nil { + return false + } + return cmp.Equal(sp1, sp2, pathCmpOptions...) +} + func initIntegrationTest(t *testing.T) { if testing.Short() { t.Skip("Integration tests skipped in short mode") @@ -75,13 +109,13 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio return admin } -func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name TopicPath) { +func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name string) { if err := admin.DeleteTopic(ctx, name); err != nil { t.Errorf("Failed to delete topic %s: %v", name, err) } } -func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name SubscriptionPath) { +func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name string) { if err := admin.DeleteSubscription(ctx, name); err != nil { t.Errorf("Failed to delete subscription %s: %v", name, err) } @@ -93,12 +127,12 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { ctx := context.Background() proj := testutil.ProjID() zone := test.RandomLiteZone() - region, _ := ZoneToRegion(zone) + region, _ := wire.ZoneToRegion(zone) resourceID := resourceIDs.New() - locationPath := LocationPath{Project: proj, Zone: zone} - topicPath := TopicPath{Project: proj, Zone: zone, TopicID: resourceID} - subscriptionPath := SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID} + locationPath := wire.LocationPath{Project: proj, Zone: zone}.String() + topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID}.String() + subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}.String() t.Logf("Topic path: %s", topicPath) admin := adminClient(ctx, t, region) @@ -111,7 +145,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 4, PerPartitionBytes: 30 * gibi, - RetentionDuration: time.Duration(24 * time.Hour), + RetentionDuration: 24 * time.Hour, } gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig) @@ -119,13 +153,13 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { t.Fatalf("Failed to create topic: %v", err) } defer cleanUpTopic(ctx, t, admin, topicPath) - if diff := testutil.Diff(gotTopicConfig, newTopicConfig, testCmpOptions...); diff != "" { + if diff := testutil.Diff(gotTopicConfig, newTopicConfig, configCmpOptions...); diff != "" { t.Errorf("CreateTopic() got: -, want: +\n%s", diff) } if gotTopicConfig, err := admin.Topic(ctx, topicPath); err != nil { t.Errorf("Failed to get topic: %v", err) - } else if diff := testutil.Diff(gotTopicConfig, newTopicConfig, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(gotTopicConfig, newTopicConfig, configCmpOptions...); diff != "" { t.Errorf("Topic() got: -, want: +\n%s", diff) } @@ -142,14 +176,14 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { if err == iterator.Done { break } - if testutil.Equal(topic.Name, topicPath, testCmpOptions...) { + if TopicPathsEqual(topic.Name, topicPath) { foundTopic = topic break } } if foundTopic == nil { t.Error("Topics() did not return topic config") - } else if diff := testutil.Diff(foundTopic, newTopicConfig, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(foundTopic, newTopicConfig, configCmpOptions...); diff != "" { t.Errorf("Topics() found config: -, want: +\n%s", diff) } @@ -168,7 +202,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { } if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate1); err != nil { t.Errorf("Failed to update topic: %v", err) - } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig1, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig1, configCmpOptions...); diff != "" { t.Errorf("UpdateTopic() got: -, want: +\n%s", diff) } @@ -187,7 +221,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { } if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil { t.Errorf("Failed to update topic: %v", err) - } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig2, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig2, configCmpOptions...); diff != "" { t.Errorf("UpdateTopic() got: -, want: +\n%s", diff) } @@ -203,13 +237,13 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { t.Fatalf("Failed to create subscription: %v", err) } defer cleanUpSubscription(ctx, t, admin, subscriptionPath) - if diff := testutil.Diff(gotSubsConfig, newSubsConfig, testCmpOptions...); diff != "" { + if diff := testutil.Diff(gotSubsConfig, newSubsConfig, configCmpOptions...); diff != "" { t.Errorf("CreateSubscription() got: -, want: +\n%s", diff) } if gotSubsConfig, err := admin.Subscription(ctx, subscriptionPath); err != nil { t.Errorf("Failed to get subscription: %v", err) - } else if diff := testutil.Diff(gotSubsConfig, newSubsConfig, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(gotSubsConfig, newSubsConfig, configCmpOptions...); diff != "" { t.Errorf("Subscription() got: -, want: +\n%s", diff) } @@ -220,14 +254,14 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { if err == iterator.Done { break } - if testutil.Equal(subs.Name, subscriptionPath, testCmpOptions...) { + if SubscriptionPathsEqual(subs.Name, subscriptionPath) { foundSubs = subs break } } if foundSubs == nil { t.Error("Subscriptions() did not return subscription config") - } else if diff := testutil.Diff(foundSubs, gotSubsConfig, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(foundSubs, gotSubsConfig, configCmpOptions...); diff != "" { t.Errorf("Subscriptions() found config: -, want: +\n%s", diff) } @@ -238,7 +272,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { if err == iterator.Done { break } - if testutil.Equal(subsPath, subscriptionPath, testCmpOptions...) { + if SubscriptionPathsEqual(subsPath, subscriptionPath) { foundSubsPath = true break } @@ -258,7 +292,7 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) { } if gotSubsConfig, err := admin.UpdateSubscription(ctx, subsUpdate); err != nil { t.Errorf("Failed to update subscription: %v", err) - } else if diff := testutil.Diff(gotSubsConfig, wantUpdatedSubsConfig, testCmpOptions...); diff != "" { + } else if diff := testutil.Diff(gotSubsConfig, wantUpdatedSubsConfig, configCmpOptions...); diff != "" { t.Errorf("UpdateSubscription() got: -, want: +\n%s", diff) } } diff --git a/pubsublite/internal/wire/resources.go b/pubsublite/internal/wire/resources.go index b31b4dd0ff6..5df9e817cfe 100644 --- a/pubsublite/internal/wire/resources.go +++ b/pubsublite/internal/wire/resources.go @@ -15,6 +15,7 @@ package wire import ( "fmt" + "regexp" "strings" ) @@ -40,6 +41,109 @@ func ValidateRegion(input string) error { return nil } +// ZoneToRegion returns the region that the given zone is in. +func ZoneToRegion(zone string) (string, error) { + if err := ValidateZone(zone); err != nil { + return "", err + } + return zone[0:strings.LastIndex(zone, "-")], nil +} + +// LocationPath stores a path consisting of a project and zone. +type LocationPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone, for example "us-central1-a". + Zone string +} + +func (l LocationPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s", l.Project, l.Zone) +} + +var locPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)$`) + +// ParseLocationPath parses a project/location path. +func ParseLocationPath(input string) (LocationPath, error) { + parts := locPathRE.FindStringSubmatch(input) + if len(parts) < 3 { + return LocationPath{}, fmt.Errorf("pubsublite: invalid location path %q. valid format is %q", + input, "projects/PROJECT_ID/locations/ZONE") + } + return LocationPath{Project: parts[1], Zone: parts[2]}, nil +} + +// TopicPath stores the full path of a Pub/Sub Lite topic. +type TopicPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone, for example "us-central1-a". + Zone string + + // The ID of the Pub/Sub Lite topic, for example "my-topic-name". + TopicID string +} + +func (t TopicPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID) +} + +// Location returns the topic's location path. +func (t TopicPath) Location() LocationPath { + return LocationPath{Project: t.Project, Zone: t.Zone} +} + +var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`) + +// ParseTopicPath parses the full path of a Pub/Sub Lite topic. +func ParseTopicPath(input string) (TopicPath, error) { + parts := topicPathRE.FindStringSubmatch(input) + if len(parts) < 4 { + return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q. valid format is %q", + input, "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID") + } + return TopicPath{Project: parts[1], Zone: parts[2], TopicID: parts[3]}, nil +} + +// SubscriptionPath stores the full path of a Pub/Sub Lite subscription. +type SubscriptionPath struct { + // A Google Cloud project. The project ID (e.g. "my-project") or the project + // number (e.g. "987654321") can be provided. + Project string + + // A Google Cloud zone. An example zone is "us-central1-a". + Zone string + + // The ID of the Pub/Sub Lite subscription, for example + // "my-subscription-name". + SubscriptionID string +} + +func (s SubscriptionPath) String() string { + return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID) +} + +// Location returns the subscription's location path. +func (s SubscriptionPath) Location() LocationPath { + return LocationPath{Project: s.Project, Zone: s.Zone} +} + +var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`) + +// ParseSubscriptionPath parses the full path of a Pub/Sub Lite subscription. +func ParseSubscriptionPath(input string) (SubscriptionPath, error) { + parts := subsPathRE.FindStringSubmatch(input) + if len(parts) < 4 { + return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q. valid format is %q", + input, "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID") + } + return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil +} + type topicPartition struct { Path string Partition int diff --git a/pubsublite/internal/wire/resources_test.go b/pubsublite/internal/wire/resources_test.go index 8191234a191..40582ebbde8 100644 --- a/pubsublite/internal/wire/resources_test.go +++ b/pubsublite/internal/wire/resources_test.go @@ -76,3 +76,197 @@ func TestValidateRegion(t *testing.T) { }) } } + +func TestZoneToRegion(t *testing.T) { + for _, tc := range []struct { + desc string + zone string + wantRegion string + wantErr bool + }{ + { + desc: "valid", + zone: "europe-west1-d", + wantRegion: "europe-west1", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + zone: "europe-west1", + wantErr: true, + }, + { + desc: "invalid: no dashes", + zone: "europewest1", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotRegion, gotErr := ZoneToRegion(tc.zone) + if gotRegion != tc.wantRegion || (gotErr != nil) != tc.wantErr { + t.Errorf("ZoneToRegion(%q) = (%v, %v), want (%v, err=%v)", tc.zone, gotRegion, gotErr, tc.wantRegion, tc.wantErr) + } + }) + } +} + +func TestParseLocationPath(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantPath LocationPath + wantErr bool + }{ + { + desc: "valid: location path", + input: "projects/987654321/locations/europe-west1-d", + wantPath: LocationPath{Project: "987654321", Zone: "europe-west1-d"}, + }, + { + desc: "invalid: zone", + input: "europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: missing project", + input: "projects//locations/europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: missing zone", + input: "projects/987654321/locations/", + wantErr: true, + }, + { + desc: "invalid: has prefix", + input: "prefix/projects/987654321/locations/europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: has suffix", + input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotPath, gotErr := ParseLocationPath(tc.input) + if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { + t.Errorf("ParseLocationPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + } + }) + } +} + +func TestParseTopicPath(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantPath TopicPath + wantErr bool + }{ + { + desc: "valid: topic path", + input: "projects/987654321/locations/europe-west1-d/topics/my-topic", + wantPath: TopicPath{Project: "987654321", Zone: "europe-west1-d", TopicID: "my-topic"}, + }, + { + desc: "invalid: zone", + input: "europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: subscription path", + input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing project", + input: "projects//locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing zone", + input: "projects/987654321/locations//topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing topic id", + input: "projects/987654321/locations/europe-west1-d/topics/", + wantErr: true, + }, + { + desc: "invalid: has prefix", + input: "prefix/projects/987654321/locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: has suffix", + input: "projects/my-project/locations/us-west1-b/topics/my-topic/subresource/desc", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotPath, gotErr := ParseTopicPath(tc.input) + if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { + t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + } + }) + } +} + +func TestParseSubscriptionPath(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantPath SubscriptionPath + wantErr bool + }{ + { + desc: "valid: subscription path", + input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantPath: SubscriptionPath{Project: "987654321", Zone: "europe-west1-d", SubscriptionID: "my-subs"}, + }, + { + desc: "invalid: zone", + input: "europe-west1-d", + wantErr: true, + }, + { + desc: "invalid: topic path", + input: "projects/987654321/locations/europe-west1-d/topics/my-topic", + wantErr: true, + }, + { + desc: "invalid: missing project", + input: "projects//locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing zone", + input: "projects/987654321/locations//subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: missing subscription id", + input: "projects/987654321/locations/europe-west1-d/subscriptions/", + wantErr: true, + }, + { + desc: "invalid: has prefix", + input: "prefix/projects/987654321/locations/europe-west1-d/subscriptions/my-subs", + wantErr: true, + }, + { + desc: "invalid: has suffix", + input: "projects/my-project/locations/us-west1-b/subscriptions/my-subs/subresource/desc", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + gotPath, gotErr := ParseSubscriptionPath(tc.input) + if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { + t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + } + }) + } +} diff --git a/pubsublite/ps/example_test.go b/pubsublite/ps/example_test.go index 93f23a657ad..b57bfcf680f 100644 --- a/pubsublite/ps/example_test.go +++ b/pubsublite/ps/example_test.go @@ -18,17 +18,12 @@ import ( "fmt" "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/ps" ) func ExamplePublisherClient_Publish() { ctx := context.Background() - topic := pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - } + const topic = "projects/my-project/locations/zone/topics/my-topic" // NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent. publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) if err != nil { @@ -53,11 +48,7 @@ func ExamplePublisherClient_Publish() { func ExamplePublisherClient_Error() { ctx := context.Background() - topic := pubsublite.TopicPath{ - Project: "project-id", - Zone: "zone", - TopicID: "topic-id", - } + const topic = "projects/my-project/locations/zone/topics/my-topic" publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic) if err != nil { // TODO: Handle error. @@ -84,11 +75,7 @@ func ExamplePublisherClient_Error() { func ExampleSubscriberClient_Receive() { ctx := context.Background() - subscription := pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - } + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" // NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent. subscriber, err := ps.NewSubscriberClient(ctx, ps.DefaultReceiveSettings, subscription) if err != nil { @@ -115,11 +102,7 @@ func ExampleSubscriberClient_Receive() { // partitions in the associated topic. func ExampleSubscriberClient_Receive_maxOutstanding() { ctx := context.Background() - subscription := pubsublite.SubscriptionPath{ - Project: "project-id", - Zone: "zone", - SubscriptionID: "subscription-id", - } + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" settings := ps.DefaultReceiveSettings settings.MaxOutstandingMessages = 5 settings.MaxOutstandingBytes = 10e6 diff --git a/pubsublite/ps/integration_test.go b/pubsublite/ps/integration_test.go index ff95e533d13..ca6aa4574e7 100644 --- a/pubsublite/ps/integration_test.go +++ b/pubsublite/ps/integration_test.go @@ -27,6 +27,7 @@ import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/internal/test" + "cloud.google.com/go/pubsublite/internal/wire" "github.com/google/go-cmp/cmp/cmpopts" "golang.org/x/sync/errgroup" "google.golang.org/api/option" @@ -78,40 +79,40 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio return admin } -func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) *PublisherClient { +func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings, topic wire.TopicPath, opts ...option.ClientOption) *PublisherClient { opts = testOptions(ctx, t, opts...) - pub, err := NewPublisherClient(ctx, settings, topic, opts...) + pub, err := NewPublisherClient(ctx, settings, topic.String(), opts...) if err != nil { t.Fatalf("Failed to create publisher client: %v", err) } return pub } -func subscriberClient(ctx context.Context, t *testing.T, settings ReceiveSettings, subscription pubsublite.SubscriptionPath, opts ...option.ClientOption) *SubscriberClient { +func subscriberClient(ctx context.Context, t *testing.T, settings ReceiveSettings, subscription wire.SubscriptionPath, opts ...option.ClientOption) *SubscriberClient { opts = testOptions(ctx, t, opts...) - sub, err := NewSubscriberClient(ctx, settings, subscription, opts...) + sub, err := NewSubscriberClient(ctx, settings, subscription.String(), opts...) if err != nil { t.Fatalf("Failed to create publisher client: %v", err) } return sub } -func initResourcePaths(t *testing.T) (string, pubsublite.TopicPath, pubsublite.SubscriptionPath) { +func initResourcePaths(t *testing.T) (string, wire.TopicPath, wire.SubscriptionPath) { initIntegrationTest(t) proj := testutil.ProjID() zone := test.RandomLiteZone() - region, _ := pubsublite.ZoneToRegion(zone) + region, _ := wire.ZoneToRegion(zone) resourceID := resourceIDs.New() - topicPath := pubsublite.TopicPath{Project: proj, Zone: zone, TopicID: resourceID} - subscriptionPath := pubsublite.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID} + topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID} + subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID} return region, topicPath, subscriptionPath } -func createTopic(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, topic pubsublite.TopicPath, partitionCount int) { +func createTopic(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, topic wire.TopicPath, partitionCount int) { topicConfig := pubsublite.TopicConfig{ - Name: topic, + Name: topic.String(), PartitionCount: partitionCount, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 8, @@ -126,18 +127,18 @@ func createTopic(ctx context.Context, t *testing.T, admin *pubsublite.AdminClien } } -func cleanUpTopic(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, topic pubsublite.TopicPath) { - if err := admin.DeleteTopic(ctx, topic); err != nil { +func cleanUpTopic(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, topic wire.TopicPath) { + if err := admin.DeleteTopic(ctx, topic.String()); err != nil { t.Errorf("Failed to delete topic %s: %v", topic, err) } else { t.Logf("Deleted topic %s", topic) } } -func createSubscription(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, subscription pubsublite.SubscriptionPath, topic pubsublite.TopicPath) { +func createSubscription(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, subscription wire.SubscriptionPath, topic wire.TopicPath) { subConfig := &pubsublite.SubscriptionConfig{ - Name: subscription, - Topic: topic, + Name: subscription.String(), + Topic: topic.String(), DeliveryRequirement: pubsublite.DeliverImmediately, } _, err := admin.CreateSubscription(ctx, *subConfig) @@ -148,8 +149,8 @@ func createSubscription(ctx context.Context, t *testing.T, admin *pubsublite.Adm } } -func cleanUpSubscription(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, subscription pubsublite.SubscriptionPath) { - if err := admin.DeleteSubscription(ctx, subscription); err != nil { +func cleanUpSubscription(ctx context.Context, t *testing.T, admin *pubsublite.AdminClient, subscription wire.SubscriptionPath) { + if err := admin.DeleteSubscription(ctx, subscription.String()); err != nil { t.Errorf("Failed to delete subscription %s: %v", subscription, err) } else { t.Logf("Deleted subscription %s", subscription) @@ -164,7 +165,7 @@ func partitionNumbers(partitionCount int) []int { return partitions } -func publishMessages(t *testing.T, settings PublishSettings, topic pubsublite.TopicPath, msgs ...*pubsub.Message) { +func publishMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgs ...*pubsub.Message) { ctx := context.Background() publisher := publisherClient(ctx, t, settings, topic) defer publisher.Stop() @@ -176,7 +177,7 @@ func publishMessages(t *testing.T, settings PublishSettings, topic pubsublite.To waitForPublishResults(t, pubResults) } -func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic pubsublite.TopicPath, msgPrefix string, messageCount int) []string { +func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, messageCount int) []string { ctx := context.Background() publisher := publisherClient(ctx, t, settings, topic) defer publisher.Stop() @@ -220,7 +221,7 @@ func messageDiff(got, want *pubsub.Message) string { type checkOrdering bool -func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription pubsublite.SubscriptionPath, checkOrder checkOrdering) { +func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath, checkOrder checkOrdering) { cctx, stopSubscriber := context.WithTimeout(context.Background(), defaultTestTimeout) orderingValidator := test.NewOrderingReceiver() @@ -251,7 +252,7 @@ func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings Rece } } -func receiveAndVerifyMessage(t *testing.T, want *pubsub.Message, settings ReceiveSettings, subscription pubsublite.SubscriptionPath) { +func receiveAndVerifyMessage(t *testing.T, want *pubsub.Message, settings ReceiveSettings, subscription wire.SubscriptionPath) { cctx, stopSubscriber := context.WithTimeout(context.Background(), defaultTestTimeout) messageReceiver := func(ctx context.Context, got *pubsub.Message) { @@ -637,7 +638,7 @@ func TestIntegration_SubscribeFanOut(t *testing.T) { createTopic(ctx, t, admin, topicPath, partitionCount) defer cleanUpTopic(ctx, t, admin, topicPath) - var subscriptionPaths []pubsublite.SubscriptionPath + var subscriptionPaths []wire.SubscriptionPath for i := 0; i < subscriberCount; i++ { subscription := baseSubscriptionPath subscription.SubscriptionID += fmt.Sprintf("%s-%d", baseSubscriptionPath.SubscriptionID, i) diff --git a/pubsublite/ps/publisher.go b/pubsublite/ps/publisher.go index ead99913551..8d7799e7e05 100644 --- a/pubsublite/ps/publisher.go +++ b/pubsublite/ps/publisher.go @@ -18,7 +18,6 @@ import ( "sync" "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/internal/wire" "cloud.google.com/go/pubsublite/publish" "golang.org/x/xerrors" @@ -55,7 +54,7 @@ func translateError(err error) error { return err } -// PublisherClient is a Cloud Pub/Sub Lite client to publish messages to a given +// PublisherClient is a Pub/Sub Lite client to publish messages to a given // topic. A PublisherClient is safe to use from multiple goroutines. // // See https://cloud.google.com/pubsub/lite/docs/publishing for more information @@ -69,13 +68,15 @@ type PublisherClient struct { err error } -// NewPublisherClient creates a new Cloud Pub/Sub Lite client to publish -// messages to a given topic. -// -// See https://cloud.google.com/pubsub/lite/docs/publishing for more information -// about publishing. -func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pubsublite.TopicPath, opts ...option.ClientOption) (*PublisherClient, error) { - region, err := pubsublite.ZoneToRegion(topic.Zone) +// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to a +// given topic. A valid topic path has the format: +// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func NewPublisherClient(ctx context.Context, settings PublishSettings, topic string, opts ...option.ClientOption) (*PublisherClient, error) { + topicPath, err := wire.ParseTopicPath(topic) + if err != nil { + return nil, err + } + region, err := wire.ZoneToRegion(topicPath.Zone) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func NewPublisherClient(ctx context.Context, settings PublishSettings, topic pub // Note: ctx is not used to create the wire publisher, because if it is // cancelled, the publisher will not be able to perform graceful shutdown // (e.g. flush pending messages). - wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic.String(), opts...) + wirePub, err := wire.NewPublisher(context.Background(), settings.toWireSettings(), region, topic, opts...) if err != nil { return nil, err } diff --git a/pubsublite/ps/subscriber.go b/pubsublite/ps/subscriber.go index 8e58ba9d119..ef495f21bb2 100644 --- a/pubsublite/ps/subscriber.go +++ b/pubsublite/ps/subscriber.go @@ -19,7 +19,6 @@ import ( "sync" "cloud.google.com/go/pubsub" - "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/internal/wire" "google.golang.org/api/option" @@ -78,7 +77,7 @@ type wireSubscriberFactory interface { type wireSubscriberFactoryImpl struct { settings wire.ReceiveSettings region string - subscription pubsublite.SubscriptionPath + subscription wire.SubscriptionPath options []option.ClientOption } @@ -209,7 +208,7 @@ func (si *subscriberInstance) Wait(ctx context.Context) error { return err } -// MessageReceiverFunc handles messages sent by the Cloud Pub/Sub Lite service. +// MessageReceiverFunc handles messages sent by the Pub/Sub Lite service. // // The implementation must arrange for pubsub.Message.Ack() or // pubsub.Message.Nack() to be called after processing the message. @@ -220,8 +219,8 @@ func (si *subscriberInstance) Wait(ctx context.Context) error { // callback will block the delivery of subsequent messages for the partition. type MessageReceiverFunc func(context.Context, *pubsub.Message) -// SubscriberClient is a Cloud Pub/Sub Lite client to receive messages for a -// given subscription. +// SubscriberClient is a Pub/Sub Lite client to receive messages for a given +// subscription. // // See https://cloud.google.com/pubsub/lite/docs/subscribing for more // information about receiving messages. @@ -234,20 +233,22 @@ type SubscriberClient struct { receiveActive bool } -// NewSubscriberClient creates a new Cloud Pub/Sub Lite client to receive -// messages for a given subscription. -// -// See https://cloud.google.com/pubsub/lite/docs/subscribing for more -// information about receiving messages. -func NewSubscriberClient(ctx context.Context, settings ReceiveSettings, subscription pubsublite.SubscriptionPath, opts ...option.ClientOption) (*SubscriberClient, error) { - region, err := pubsublite.ZoneToRegion(subscription.Zone) +// NewSubscriberClient creates a new Pub/Sub Lite client to receive messages for +// a given subscription. A valid subscription path has the format: +// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". +func NewSubscriberClient(ctx context.Context, settings ReceiveSettings, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) { + subscriptionPath, err := wire.ParseSubscriptionPath(subscription) + if err != nil { + return nil, err + } + region, err := wire.ZoneToRegion(subscriptionPath.Zone) if err != nil { return nil, err } factory := &wireSubscriberFactoryImpl{ settings: settings.toWireSettings(), region: region, - subscription: subscription, + subscription: subscriptionPath, options: opts, } subClient := &SubscriberClient{ diff --git a/pubsublite/types.go b/pubsublite/types.go deleted file mode 100644 index dfb2b56438d..00000000000 --- a/pubsublite/types.go +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and - -package pubsublite - -import ( - "fmt" - "regexp" - "strings" - - "cloud.google.com/go/pubsublite/internal/wire" -) - -// LocationPath stores a path consisting of a project and zone. -type LocationPath struct { - // A Google Cloud project. The project ID (e.g. "my-project") or the project - // number (e.g. "987654321") can be provided. - Project string - - // A Google Cloud zone, for example "us-central1-a". - // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Cloud Pub/Sub Lite is available. - Zone string -} - -func (l LocationPath) String() string { - return fmt.Sprintf("projects/%s/locations/%s", l.Project, l.Zone) -} - -// TopicPath stores the full path of a Cloud Pub/Sub Lite topic. -// See https://cloud.google.com/pubsub/lite/docs/topics for more information. -type TopicPath struct { - // A Google Cloud project. The project ID (e.g. "my-project") or the project - // number (e.g. "987654321") can be provided. - Project string - - // A Google Cloud zone, for example "us-central1-a". - // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Cloud Pub/Sub Lite is available. - Zone string - - // The ID of the Cloud Pub/Sub Lite topic, for example "my-topic-name". - // See https://cloud.google.com/pubsub/docs/admin#resource_names for more - // information. - TopicID string -} - -func (t TopicPath) String() string { - return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID) -} - -func (t TopicPath) location() LocationPath { - return LocationPath{Project: t.Project, Zone: t.Zone} -} - -var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`) - -// parseTopicPath parses the full path of a Cloud Pub/Sub Lite topic, which -// should have the format: `projects/{project}/locations/{zone}/topics/{id}`. -func parseTopicPath(input string) (TopicPath, error) { - parts := topicPathRE.FindStringSubmatch(input) - if len(parts) < 4 { - return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input) - } - return TopicPath{Project: parts[1], Zone: parts[2], TopicID: parts[3]}, nil -} - -// SubscriptionPath stores the full path of a Cloud Pub/Sub Lite subscription. -// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more -// information. -type SubscriptionPath struct { - // A Google Cloud project. The project ID (e.g. "my-project") or the project - // number (e.g. "987654321") can be provided. - Project string - - // A Google Cloud zone. An example zone is "us-central1-a". - // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Cloud Pub/Sub Lite is available. - Zone string - - // The ID of the Cloud Pub/Sub Lite subscription, for example - // "my-subscription-name". - // See https://cloud.google.com/pubsub/docs/admin#resource_names for more - // information. - SubscriptionID string -} - -func (s SubscriptionPath) String() string { - return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID) -} - -func (s SubscriptionPath) location() LocationPath { - return LocationPath{Project: s.Project, Zone: s.Zone} -} - -var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`) - -// parseSubscriptionPath parses the full path of a Cloud Pub/Sub Lite -// subscription, which should have the format: -// `projects/{project}/locations/{zone}/subscriptions/{id}`. -func parseSubscriptionPath(input string) (SubscriptionPath, error) { - parts := subsPathRE.FindStringSubmatch(input) - if len(parts) < 4 { - return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input) - } - return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil -} - -// ZoneToRegion returns the region that the given zone is in. -func ZoneToRegion(zone string) (string, error) { - if err := wire.ValidateZone(zone); err != nil { - return "", err - } - return zone[0:strings.LastIndex(zone, "-")], nil -} diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go deleted file mode 100644 index fcd89af79c4..00000000000 --- a/pubsublite/types_test.go +++ /dev/null @@ -1,163 +0,0 @@ -// Copyright 2020 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and - -package pubsublite - -import "testing" - -func TestParseTopicPath(t *testing.T) { - for _, tc := range []struct { - desc string - input string - wantPath TopicPath - wantErr bool - }{ - { - desc: "valid: topic path", - input: "projects/987654321/locations/europe-west1-d/topics/my-topic", - wantPath: TopicPath{Project: "987654321", Zone: "europe-west1-d", TopicID: "my-topic"}, - }, - { - desc: "invalid: zone", - input: "europe-west1-d", - wantErr: true, - }, - { - desc: "invalid: subscription path", - input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", - wantErr: true, - }, - { - desc: "invalid: missing project", - input: "projects//locations/europe-west1-d/topics/my-topic", - wantErr: true, - }, - { - desc: "invalid: missing zone", - input: "projects/987654321/locations//topics/my-topic", - wantErr: true, - }, - { - desc: "invalid: missing topic id", - input: "projects/987654321/locations/europe-west1-d/topics/", - wantErr: true, - }, - { - desc: "invalid: has prefix", - input: "prefix/projects/987654321/locations/europe-west1-d/topics/my-topic", - wantErr: true, - }, - { - desc: "invalid: has suffix", - input: "projects/my-project/locations/us-west1-b/topics/my-topic/subresource/desc", - wantErr: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - gotPath, gotErr := parseTopicPath(tc.input) - if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { - t.Errorf("parseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) - } - }) - } -} - -func TestParseSubscriptionPath(t *testing.T) { - for _, tc := range []struct { - desc string - input string - wantPath SubscriptionPath - wantErr bool - }{ - { - desc: "valid: subscription path", - input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs", - wantPath: SubscriptionPath{Project: "987654321", Zone: "europe-west1-d", SubscriptionID: "my-subs"}, - }, - { - desc: "invalid: zone", - input: "europe-west1-d", - wantErr: true, - }, - { - desc: "invalid: topic path", - input: "projects/987654321/locations/europe-west1-d/topics/my-topic", - wantErr: true, - }, - { - desc: "invalid: missing project", - input: "projects//locations/europe-west1-d/subscriptions/my-subs", - wantErr: true, - }, - { - desc: "invalid: missing zone", - input: "projects/987654321/locations//subscriptions/my-subs", - wantErr: true, - }, - { - desc: "invalid: missing subscription id", - input: "projects/987654321/locations/europe-west1-d/subscriptions/", - wantErr: true, - }, - { - desc: "invalid: has prefix", - input: "prefix/projects/987654321/locations/europe-west1-d/subscriptions/my-subs", - wantErr: true, - }, - { - desc: "invalid: has suffix", - input: "projects/my-project/locations/us-west1-b/subscriptions/my-subs/subresource/desc", - wantErr: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - gotPath, gotErr := parseSubscriptionPath(tc.input) - if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { - t.Errorf("parseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) - } - }) - } -} - -func TestZoneToRegion(t *testing.T) { - for _, tc := range []struct { - desc string - zone string - wantRegion string - wantErr bool - }{ - { - desc: "valid", - zone: "europe-west1-d", - wantRegion: "europe-west1", - wantErr: false, - }, - { - desc: "invalid: insufficient dashes", - zone: "europe-west1", - wantErr: true, - }, - { - desc: "invalid: no dashes", - zone: "europewest1", - wantErr: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - gotRegion, gotErr := ZoneToRegion(tc.zone) - if gotRegion != tc.wantRegion || (gotErr != nil) != tc.wantErr { - t.Errorf("ZoneToRegion(%q) = (%v, %v), want (%v, err=%v)", tc.zone, gotRegion, gotErr, tc.wantRegion, tc.wantErr) - } - }) - } -}