From 749473ead30bf1872634821d3238d1299b99acc6 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Mon, 26 Oct 2020 21:22:16 +1100 Subject: [PATCH] feat(pubsublite): Pub/Sub Lite admin client (#3036) Implements pubsublite.Client, which wraps the Pub/Sub Lite Admin Service. Includes integration tests for admin operations. --- CONTRIBUTING.md | 1 + pubsublite/admin.go | 208 +++++++++++++++++++++++++ pubsublite/config.go | 62 ++++---- pubsublite/config_test.go | 1 + pubsublite/integration_test.go | 272 +++++++++++++++++++++++++++++++++ pubsublite/types.go | 28 ++-- pubsublite/types_test.go | 16 +- 7 files changed, 535 insertions(+), 53 deletions(-) create mode 100644 pubsublite/admin.go create mode 100644 pubsublite/integration_test.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3a90efec002..e1398722c3e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -104,6 +104,7 @@ Next, ensure the following APIs are enabled in the general project: - Google Compute Engine Instance Groups API - Kubernetes Engine API - Cloud Error Reporting API +- Pub/Sub Lite API Next, create a Datastore database in the general project, and a Firestore database in the Firestore project. diff --git a/pubsublite/admin.go b/pubsublite/admin.go new file mode 100644 index 00000000000..4195c3f25f7 --- /dev/null +++ b/pubsublite/admin.go @@ -0,0 +1,208 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "context" + + "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" + + vkit "cloud.google.com/go/pubsublite/apiv1" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// AdminClient provides admin operations for Google Pub/Sub Lite resources +// within a Google Cloud region. An AdminClient may be shared by multiple +// goroutines. +type AdminClient struct { + admin *vkit.AdminClient +} + +// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin +// operations for resources within a given region. +// See https://cloud.google.com/pubsub/lite/docs/locations for the list of +// regions and zones where Google Pub/Sub Lite is available. +func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")} + options = append(options, opts...) + admin, err := vkit.NewAdminClient(ctx, options...) + if err != nil { + return nil, err + } + return &AdminClient{admin: admin}, nil +} + +// CreateTopic creates a new topic from the given config. +func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) { + req := &pb.CreateTopicRequest{ + Parent: config.Name.location().String(), + Topic: config.toProto(), + TopicId: config.Name.TopicID, + } + topicpb, err := ac.admin.CreateTopic(ctx, req) + if err != nil { + return nil, err + } + return protoToTopicConfig(topicpb) +} + +// UpdateTopic updates an existing topic from the given config and returns the +// new topic config. +func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) { + topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest()) + if err != nil { + return nil, err + } + return protoToTopicConfig(topicpb) +} + +// DeleteTopic deletes a topic. +func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error { + return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()}) +} + +// Topic retrieves the configuration of a topic. +func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) { + topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()}) + if err != nil { + return nil, err + } + return protoToTopicConfig(topicpb) +} + +// TopicPartitions returns the number of partitions for a topic. +func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) { + partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()}) + if err != nil { + return 0, err + } + return int(partitions.GetPartitionCount()), nil +} + +// TopicSubscriptions retrieves the list of subscription paths for a topic. +func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) (*SubscriptionPathIterator, error) { + subsPathIt := ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()}) + return &SubscriptionPathIterator{it: subsPathIt}, nil +} + +// Topics retrieves the list of topic configs for a given project and zone. +func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator { + return &TopicIterator{ + it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}), + } +} + +// CreateSubscription creates a new subscription from the given config. +func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) { + req := &pb.CreateSubscriptionRequest{ + Parent: config.Name.location().String(), + Subscription: config.toProto(), + SubscriptionId: config.Name.SubscriptionID, + } + subspb, err := ac.admin.CreateSubscription(ctx, req) + if err != nil { + return nil, err + } + return protoToSubscriptionConfig(subspb) +} + +// UpdateSubscription updates an existing subscription from the given config and +// returns the new subscription config. +func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) { + subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest()) + if err != nil { + return nil, err + } + return protoToSubscriptionConfig(subspb) +} + +// DeleteSubscription deletes a subscription. +func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error { + return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()}) +} + +// Subscription retrieves the configuration of a subscription. +func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) { + subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()}) + if err != nil { + return nil, err + } + return protoToSubscriptionConfig(subspb) +} + +// Subscriptions retrieves the list of subscription configs for a given project +// and zone. +func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator { + return &SubscriptionIterator{ + it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}), + } +} + +// Close releases any resources held by the client when it is no longer +// required. If the client is available for the lifetime of the program, then +// Close need not be called at exit. +func (ac *AdminClient) Close() error { + return ac.admin.Close() +} + +// TopicIterator is an iterator that returns a list of topic configs. +type TopicIterator struct { + it *vkit.TopicIterator +} + +// Next returns the next topic config. The second return value will be +// iterator.Done if there are no more topic configs. +func (t *TopicIterator) Next() (*TopicConfig, error) { + topicpb, err := t.it.Next() + if err != nil { + return nil, err + } + return protoToTopicConfig(topicpb) +} + +// SubscriptionIterator is an iterator that returns a list of subscription +// configs. +type SubscriptionIterator struct { + it *vkit.SubscriptionIterator +} + +// Next returns the next subscription config. The second return value will be +// iterator.Done if there are no more subscription configs. +func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) { + subspb, err := s.it.Next() + if err != nil { + return nil, err + } + return protoToSubscriptionConfig(subspb) +} + +// SubscriptionPathIterator is an iterator that returns a list of subscription +// paths. +type SubscriptionPathIterator struct { + it *vkit.StringIterator +} + +// Next returns the next subscription path. The second return value will be +// iterator.Done if there are no more subscription paths. +func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) { + subsPath, err := sp.it.Next() + if err != nil { + return SubscriptionPath{}, err + } + return parseSubscriptionPath(subsPath) +} diff --git a/pubsublite/config.go b/pubsublite/config.go index 4225900de4c..e115c515c87 100644 --- a/pubsublite/config.go +++ b/pubsublite/config.go @@ -24,6 +24,11 @@ import ( fmpb "google.golang.org/genproto/protobuf/field_mask" ) +// InfiniteRetention is a sentinel used in topic configs to denote an infinite +// retention duration (i.e. retain messages as long as there is available +// storage). +const InfiniteRetention = time.Duration(-1) + // TopicConfig describes the properties of a Google Pub/Sub Lite topic. // See https://cloud.google.com/pubsub/lite/docs/topics for more information // about how topics are configured. @@ -33,37 +38,37 @@ type TopicConfig struct { // The number of partitions in the topic. Must be at least 1. Cannot be // changed after creation. - PartitionCount int64 + PartitionCount int // Publish throughput capacity per partition in MiB/s. // Must be >= 4 and <= 16. - PublishCapacityMiBPerSec int32 + PublishCapacityMiBPerSec int // Subscribe throughput capacity per partition in MiB/s. // Must be >= 4 and <= 32. - SubscribeCapacityMiBPerSec int32 + SubscribeCapacityMiBPerSec int // The provisioned storage, in bytes, per partition. If the number of bytes // stored in any of the topic's partitions grows beyond this value, older // messages will be dropped to make room for newer ones, regardless of the - // value of `RetentionDuration`. + // value of `RetentionDuration`. Must be > 0. PerPartitionBytes int64 - // How long a published message is retained. If unset, messages will be - // retained as long as the bytes retained for each partition is below - // `PerPartitionBytes`. - RetentionDuration optional.Duration + // How long a published message is retained. If set to `InfiniteRetention`, + // messages will be retained as long as the bytes retained for each partition + // is below `PerPartitionBytes`. Otherwise, must be > 0. + RetentionDuration time.Duration } func (tc *TopicConfig) toProto() *pb.Topic { topicpb := &pb.Topic{ Name: tc.Name.String(), PartitionConfig: &pb.Topic_PartitionConfig{ - Count: tc.PartitionCount, + Count: int64(tc.PartitionCount), Dimension: &pb.Topic_PartitionConfig_Capacity_{ Capacity: &pb.Topic_PartitionConfig_Capacity{ - PublishMibPerSec: tc.PublishCapacityMiBPerSec, - SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, + PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec), + SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec), }, }, }, @@ -71,17 +76,14 @@ func (tc *TopicConfig) toProto() *pb.Topic { PerPartitionBytes: tc.PerPartitionBytes, }, } - if tc.RetentionDuration != nil { - duration := optional.ToDuration(tc.RetentionDuration) - if duration >= 0 { - topicpb.RetentionConfig.Period = ptypes.DurationProto(duration) - } + if tc.RetentionDuration >= 0 { + topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration) } return topicpb } func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { - name, err := ParseTopicPath(t.GetName()) + name, err := parseTopicPath(t.GetName()) if err != nil { return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName()) } @@ -90,10 +92,11 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { retentionCfg := t.GetRetentionConfig() topic := &TopicConfig{ Name: name, - PartitionCount: partitionCfg.GetCount(), - PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(), - SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(), + PartitionCount: int(partitionCfg.GetCount()), + PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()), + SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()), PerPartitionBytes: retentionCfg.GetPerPartitionBytes(), + RetentionDuration: InfiniteRetention, } // An unset retention period proto denotes "infinite retention". if retentionCfg.Period != nil { @@ -106,28 +109,23 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) { return topic, nil } -// InfiniteRetention is sentinel used when updating topic configs to clear a -// retention duration (i.e. retain messages as long as there is available -// storage). -const InfiniteRetention = time.Duration(-1) - // TopicConfigToUpdate specifies the properties to update for a topic. type TopicConfigToUpdate struct { // The full path of the topic to update. Required. Name TopicPath // If non-zero, will update the publish throughput capacity per partition. - PublishCapacityMiBPerSec int32 + PublishCapacityMiBPerSec int // If non-zero, will update the subscribe throughput capacity per partition. - SubscribeCapacityMiBPerSec int32 + SubscribeCapacityMiBPerSec int // If non-zero, will update the provisioned storage per partition. PerPartitionBytes int64 // If specified, will update how long a published message is retained. To // clear a retention duration (i.e. retain messages as long as there is - // available storage), set this to `pubsublite.InfiniteRetention`. + // available storage), set this to `InfiniteRetention`. RetentionDuration optional.Duration } @@ -137,8 +135,8 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest { PartitionConfig: &pb.Topic_PartitionConfig{ Dimension: &pb.Topic_PartitionConfig_Capacity_{ Capacity: &pb.Topic_PartitionConfig_Capacity{ - PublishMibPerSec: tc.PublishCapacityMiBPerSec, - SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec, + PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec), + SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec), }, }, }, @@ -219,11 +217,11 @@ func (sc *SubscriptionConfig) toProto() *pb.Subscription { } func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) { - name, err := ParseSubscriptionPath(s.GetName()) + name, err := parseSubscriptionPath(s.GetName()) if err != nil { return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName()) } - topic, err := ParseTopicPath(s.GetTopic()) + topic, err := parseTopicPath(s.GetTopic()) if err != nil { return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic()) } diff --git a/pubsublite/config_test.go b/pubsublite/config_test.go index b9d8ec80b81..9ec387b73d9 100644 --- a/pubsublite/config_test.go +++ b/pubsublite/config_test.go @@ -92,6 +92,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) { PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 8, PerPartitionBytes: 4294967296, + RetentionDuration: InfiniteRetention, }, }, { diff --git a/pubsublite/integration_test.go b/pubsublite/integration_test.go new file mode 100644 index 00000000000..3568f591819 --- /dev/null +++ b/pubsublite/integration_test.go @@ -0,0 +1,272 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "context" + "math/rand" + "testing" + "time" + + "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/internal/uid" + "google.golang.org/api/iterator" + "google.golang.org/api/option" + + vkit "cloud.google.com/go/pubsublite/apiv1" +) + +const gibi = 1 << 30 + +var ( + resourceIDs = uid.NewSpace("go-admin-test", nil) + rng *rand.Rand + + // A random zone is selected for each integration test run. + supportedZones = []string{ + "us-central1-a", + "us-central1-b", + "us-central1-c", + "europe-west1-b", + "europe-west1-d", + } +) + +func initIntegrationTest(t *testing.T) { + if testing.Short() { + t.Skip("Integration tests skipped in short mode") + } + if testutil.ProjID() == "" { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + rng = testutil.NewRand(time.Now()) +} + +func withGRPCHeadersAssertion(t *testing.T, opts ...option.ClientOption) []option.ClientOption { + grpcHeadersEnforcer := &testutil.HeadersEnforcer{ + OnFailure: t.Errorf, + Checkers: []*testutil.HeaderChecker{ + testutil.XGoogClientHeaderChecker, + }, + } + return append(grpcHeadersEnforcer.CallOptions(), opts...) +} + +func adminClient(ctx context.Context, t *testing.T, region string, opts ...option.ClientOption) *AdminClient { + ts := testutil.TokenSource(ctx, vkit.DefaultAuthScopes()...) + if ts == nil { + t.Skip("Integration tests skipped. See CONTRIBUTING.md for details") + } + opts = append(withGRPCHeadersAssertion(t, option.WithTokenSource(ts)), opts...) + admin, err := NewAdminClient(ctx, region, opts...) + if err != nil { + t.Fatalf("Failed to create admin client: %v", err) + } + return admin +} + +func cleanUpTopic(ctx context.Context, t *testing.T, admin *AdminClient, name TopicPath) { + if err := admin.DeleteTopic(ctx, name); err != nil { + t.Errorf("Failed to delete topic %s: %v", name, err) + } +} + +func cleanUpSubscription(ctx context.Context, t *testing.T, admin *AdminClient, name SubscriptionPath) { + if err := admin.DeleteSubscription(ctx, name); err != nil { + t.Errorf("Failed to delete subscription %s: %v", name, err) + } +} + +func randomLiteZone() string { + return supportedZones[rng.Intn(len(supportedZones))] +} + +func TestResourceAdminOperations(t *testing.T) { + initIntegrationTest(t) + + ctx := context.Background() + proj := testutil.ProjID() + zone := randomLiteZone() + region, _ := ZoneToRegion(zone) + resourceID := resourceIDs.New() + + locationPath := LocationPath{Project: proj, Zone: zone} + topicPath := TopicPath{Project: proj, Zone: zone, TopicID: resourceID} + subscriptionPath := SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID} + t.Logf("Topic path: %s", topicPath) + + admin := adminClient(ctx, t, region) + defer admin.Close() + + // Topic admin operations. + newTopicConfig := &TopicConfig{ + Name: topicPath, + PartitionCount: 2, + PublishCapacityMiBPerSec: 4, + SubscribeCapacityMiBPerSec: 4, + PerPartitionBytes: 30 * gibi, + RetentionDuration: time.Duration(24 * time.Hour), + } + + gotTopicConfig, err := admin.CreateTopic(ctx, *newTopicConfig) + if err != nil { + t.Fatalf("Failed to create topic: %v", err) + } + defer cleanUpTopic(ctx, t, admin, topicPath) + if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" { + t.Errorf("CreateTopic() got: -, want: +\n%s", diff) + } + + if gotTopicConfig, err := admin.Topic(ctx, topicPath); err != nil { + t.Errorf("Failed to get topic: %v", err) + } else if diff := testutil.Diff(gotTopicConfig, newTopicConfig); diff != "" { + t.Errorf("Topic() got: -, want: +\n%s", diff) + } + + if gotTopicPartitions, err := admin.TopicPartitions(ctx, topicPath); err != nil { + t.Errorf("Failed to get topic partitions: %v", err) + } else if gotTopicPartitions != newTopicConfig.PartitionCount { + t.Errorf("TopicPartitions() got: %v, want: %v", gotTopicPartitions, newTopicConfig.PartitionCount) + } + + topicIt := admin.Topics(ctx, locationPath) + var foundTopic *TopicConfig + for { + topic, err := topicIt.Next() + if err == iterator.Done { + break + } + if testutil.Equal(topic.Name, topicPath) { + foundTopic = topic + break + } + } + if foundTopic == nil { + t.Error("Topics() did not return topic config") + } else if diff := testutil.Diff(foundTopic, newTopicConfig); diff != "" { + t.Errorf("Topics() found config: -, want: +\n%s", diff) + } + + topicUpdate1 := TopicConfigToUpdate{ + Name: topicPath, + PublishCapacityMiBPerSec: 6, + SubscribeCapacityMiBPerSec: 8, + } + wantUpdatedTopicConfig1 := &TopicConfig{ + Name: topicPath, + PartitionCount: 2, + PublishCapacityMiBPerSec: 6, + SubscribeCapacityMiBPerSec: 8, + PerPartitionBytes: 30 * gibi, + RetentionDuration: time.Duration(24 * time.Hour), + } + if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate1); err != nil { + t.Errorf("Failed to update topic: %v", err) + } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig1); diff != "" { + t.Errorf("UpdateTopic() got: -, want: +\n%s", diff) + } + + topicUpdate2 := TopicConfigToUpdate{ + Name: topicPath, + PerPartitionBytes: 35 * gibi, + RetentionDuration: InfiniteRetention, + } + wantUpdatedTopicConfig2 := &TopicConfig{ + Name: topicPath, + PartitionCount: 2, + PublishCapacityMiBPerSec: 6, + SubscribeCapacityMiBPerSec: 8, + PerPartitionBytes: 35 * gibi, + RetentionDuration: InfiniteRetention, + } + if gotTopicConfig, err := admin.UpdateTopic(ctx, topicUpdate2); err != nil { + t.Errorf("Failed to update topic: %v", err) + } else if diff := testutil.Diff(gotTopicConfig, wantUpdatedTopicConfig2); diff != "" { + t.Errorf("UpdateTopic() got: -, want: +\n%s", diff) + } + + // Subscription admin operations. + newSubsConfig := &SubscriptionConfig{ + Name: subscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverImmediately, + } + + gotSubsConfig, err := admin.CreateSubscription(ctx, *newSubsConfig) + if err != nil { + t.Fatalf("Failed to create subscription: %v", err) + } + defer cleanUpSubscription(ctx, t, admin, subscriptionPath) + if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" { + t.Errorf("CreateSubscription() got: -, want: +\n%s", diff) + } + + if gotSubsConfig, err := admin.Subscription(ctx, subscriptionPath); err != nil { + t.Errorf("Failed to get subscription: %v", err) + } else if diff := testutil.Diff(gotSubsConfig, newSubsConfig); diff != "" { + t.Errorf("Subscription() got: -, want: +\n%s", diff) + } + + subsIt := admin.Subscriptions(ctx, locationPath) + var foundSubs *SubscriptionConfig + for { + subs, err := subsIt.Next() + if err == iterator.Done { + break + } + if testutil.Equal(subs.Name, subscriptionPath) { + foundSubs = subs + break + } + } + if foundSubs == nil { + t.Error("Subscriptions() did not return subscription config") + } else if diff := testutil.Diff(foundSubs, gotSubsConfig); diff != "" { + t.Errorf("Subscriptions() found config: -, want: +\n%s", diff) + } + + if subsPathIt, err := admin.TopicSubscriptions(ctx, topicPath); err != nil { + t.Errorf("Failed to list topic subscriptions: %v", err) + } else { + foundSubsPath := false + for { + subsPath, err := subsPathIt.Next() + if err == iterator.Done { + break + } + if testutil.Equal(subsPath, subscriptionPath) { + foundSubsPath = true + break + } + } + if !foundSubsPath { + t.Error("TopicSubscriptions() did not return subscription path") + } + } + + subsUpdate := SubscriptionConfigToUpdate{ + Name: subscriptionPath, + DeliveryRequirement: DeliverAfterStored, + } + wantUpdatedSubsConfig := &SubscriptionConfig{ + Name: subscriptionPath, + Topic: topicPath, + DeliveryRequirement: DeliverAfterStored, + } + if gotSubsConfig, err := admin.UpdateSubscription(ctx, subsUpdate); err != nil { + t.Errorf("Failed to update subscription: %v", err) + } else if diff := testutil.Diff(gotSubsConfig, wantUpdatedSubsConfig); diff != "" { + t.Errorf("UpdateSubscription() got: -, want: +\n%s", diff) + } +} diff --git a/pubsublite/types.go b/pubsublite/types.go index a120c49452c..40104d2d3c7 100644 --- a/pubsublite/types.go +++ b/pubsublite/types.go @@ -48,6 +48,8 @@ type TopicPath struct { Zone string // The ID of the Google Pub/Sub Lite topic, for example "my-topic-name". + // See https://cloud.google.com/pubsub/docs/admin#resource_names for more + // information. TopicID string } @@ -55,16 +57,15 @@ func (t TopicPath) String() string { return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID) } -// Location returns the path of the parent location. -func (t TopicPath) Location() LocationPath { +func (t TopicPath) location() LocationPath { return LocationPath{Project: t.Project, Zone: t.Zone} } var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`) -// ParseTopicPath parses the full path of a Google Pub/Sub Lite topic, which +// parseTopicPath parses the full path of a Google Pub/Sub Lite topic, which // should have the format: `projects/{project}/locations/{zone}/topics/{id}`. -func ParseTopicPath(input string) (TopicPath, error) { +func parseTopicPath(input string) (TopicPath, error) { parts := topicPathRE.FindStringSubmatch(input) if len(parts) < 4 { return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input) @@ -87,6 +88,8 @@ type SubscriptionPath struct { // The ID of the Google Pub/Sub Lite subscription, for example // "my-subscription-name". + // See https://cloud.google.com/pubsub/docs/admin#resource_names for more + // information. SubscriptionID string } @@ -94,17 +97,16 @@ func (s SubscriptionPath) String() string { return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID) } -// Location returns the path of the parent location. -func (s SubscriptionPath) Location() LocationPath { +func (s SubscriptionPath) location() LocationPath { return LocationPath{Project: s.Project, Zone: s.Zone} } var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`) -// ParseSubscriptionPath parses the full path of a Google Pub/Sub Lite +// parseSubscriptionPath parses the full path of a Google Pub/Sub Lite // subscription, which should have the format: // `projects/{project}/locations/{zone}/subscriptions/{id}`. -func ParseSubscriptionPath(input string) (SubscriptionPath, error) { +func parseSubscriptionPath(input string) (SubscriptionPath, error) { parts := subsPathRE.FindStringSubmatch(input) if len(parts) < 4 { return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input) @@ -112,10 +114,10 @@ func ParseSubscriptionPath(input string) (SubscriptionPath, error) { return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil } -// ValidateZone verifies that the `input` string has the format of a valid +// validateZone verifies that the `input` string has the format of a valid // Google Cloud zone. An example zone is "europe-west1-b". // See https://cloud.google.com/compute/docs/regions-zones for more information. -func ValidateZone(input string) error { +func validateZone(input string) error { parts := strings.Split(input, "-") if len(parts) != 3 { return fmt.Errorf("pubsublite: invalid zone %q", input) @@ -123,10 +125,10 @@ func ValidateZone(input string) error { return nil } -// ValidateRegion verifies that the `input` string has the format of a valid +// validateRegion verifies that the `input` string has the format of a valid // Google Cloud region. An example region is "europe-west1". // See https://cloud.google.com/compute/docs/regions-zones for more information. -func ValidateRegion(input string) error { +func validateRegion(input string) error { parts := strings.Split(input, "-") if len(parts) != 2 { return fmt.Errorf("pubsublite: invalid region %q", input) @@ -136,7 +138,7 @@ func ValidateRegion(input string) error { // ZoneToRegion returns the region that the given zone is in. func ZoneToRegion(zone string) (string, error) { - if err := ValidateZone(zone); err != nil { + if err := validateZone(zone); err != nil { return "", err } return zone[0:strings.LastIndex(zone, "-")], nil diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go index c9a98716434..18f02b96ac6 100644 --- a/pubsublite/types_test.go +++ b/pubsublite/types_test.go @@ -64,9 +64,9 @@ func TestParseTopicPath(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - gotPath, gotErr := ParseTopicPath(tc.input) + gotPath, gotErr := parseTopicPath(tc.input) if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { - t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + t.Errorf("parseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) } }) } @@ -121,9 +121,9 @@ func TestParseSubscriptionPath(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - gotPath, gotErr := ParseSubscriptionPath(tc.input) + gotPath, gotErr := parseSubscriptionPath(tc.input) if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr { - t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) + t.Errorf("parseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr) } }) } @@ -152,9 +152,9 @@ func TestValidateZone(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - err := ValidateZone(tc.input) + err := validateZone(tc.input) if (err != nil) != tc.wantErr { - t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + t.Errorf("validateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr) } }) } @@ -183,9 +183,9 @@ func TestValidateRegion(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - err := ValidateRegion(tc.input) + err := validateRegion(tc.input) if (err != nil) != tc.wantErr { - t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + t.Errorf("validateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr) } }) }