Skip to content

Commit

Permalink
feat(pubsublite): Pub/Sub Lite admin client (#3036)
Browse files Browse the repository at this point in the history
Implements pubsublite.Client, which wraps the Pub/Sub Lite Admin Service. Includes integration tests for admin operations.
  • Loading branch information
tmdiep committed Oct 26, 2020
1 parent 5130694 commit 749473e
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 53 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Expand Up @@ -104,6 +104,7 @@ Next, ensure the following APIs are enabled in the general project:
- Google Compute Engine Instance Groups API
- Kubernetes Engine API
- Cloud Error Reporting API
- Pub/Sub Lite API

Next, create a Datastore database in the general project, and a Firestore
database in the Firestore project.
Expand Down
208 changes: 208 additions & 0 deletions pubsublite/admin.go
@@ -0,0 +1,208 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"context"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AdminClient provides admin operations for Google Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
type AdminClient struct {
admin *vkit.AdminClient
}

// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
// operations for resources within a given region.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Google Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
options = append(options, opts...)
admin, err := vkit.NewAdminClient(ctx, options...)
if err != nil {
return nil, err
}
return &AdminClient{admin: admin}, nil
}

// CreateTopic creates a new topic from the given config.
func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
req := &pb.CreateTopicRequest{
Parent: config.Name.location().String(),
Topic: config.toProto(),
TopicId: config.Name.TopicID,
}
topicpb, err := ac.admin.CreateTopic(ctx, req)
if err != nil {
return nil, err
}
return protoToTopicConfig(topicpb)
}

// UpdateTopic updates an existing topic from the given config and returns the
// new topic config.
func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest())
if err != nil {
return nil, err
}
return protoToTopicConfig(topicpb)
}

// DeleteTopic deletes a topic.
func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error {
return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()})
}

// Topic retrieves the configuration of a topic.
func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) {
topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()})
if err != nil {
return nil, err
}
return protoToTopicConfig(topicpb)
}

// TopicPartitions returns the number of partitions for a topic.
func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
if err != nil {
return 0, err
}
return int(partitions.GetPartitionCount()), nil
}

// TopicSubscriptions retrieves the list of subscription paths for a topic.
func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) (*SubscriptionPathIterator, error) {
subsPathIt := ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()})
return &SubscriptionPathIterator{it: subsPathIt}, nil
}

// Topics retrieves the list of topic configs for a given project and zone.
func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator {
return &TopicIterator{
it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}),
}
}

// CreateSubscription creates a new subscription from the given config.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
req := &pb.CreateSubscriptionRequest{
Parent: config.Name.location().String(),
Subscription: config.toProto(),
SubscriptionId: config.Name.SubscriptionID,
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
}

// UpdateSubscription updates an existing subscription from the given config and
// returns the new subscription config.
func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest())
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
}

// DeleteSubscription deletes a subscription.
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error {
return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()})
}

// Subscription retrieves the configuration of a subscription.
func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) {
subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()})
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
}

// Subscriptions retrieves the list of subscription configs for a given project
// and zone.
func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator {
return &SubscriptionIterator{
it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}),
}
}

// Close releases any resources held by the client when it is no longer
// required. If the client is available for the lifetime of the program, then
// Close need not be called at exit.
func (ac *AdminClient) Close() error {
return ac.admin.Close()
}

// TopicIterator is an iterator that returns a list of topic configs.
type TopicIterator struct {
it *vkit.TopicIterator
}

// Next returns the next topic config. The second return value will be
// iterator.Done if there are no more topic configs.
func (t *TopicIterator) Next() (*TopicConfig, error) {
topicpb, err := t.it.Next()
if err != nil {
return nil, err
}
return protoToTopicConfig(topicpb)
}

// SubscriptionIterator is an iterator that returns a list of subscription
// configs.
type SubscriptionIterator struct {
it *vkit.SubscriptionIterator
}

// Next returns the next subscription config. The second return value will be
// iterator.Done if there are no more subscription configs.
func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
subspb, err := s.it.Next()
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
}

// SubscriptionPathIterator is an iterator that returns a list of subscription
// paths.
type SubscriptionPathIterator struct {
it *vkit.StringIterator
}

// Next returns the next subscription path. The second return value will be
// iterator.Done if there are no more subscription paths.
func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
subsPath, err := sp.it.Next()
if err != nil {
return SubscriptionPath{}, err
}
return parseSubscriptionPath(subsPath)
}
62 changes: 30 additions & 32 deletions pubsublite/config.go
Expand Up @@ -24,6 +24,11 @@ import (
fmpb "google.golang.org/genproto/protobuf/field_mask"
)

// InfiniteRetention is a sentinel used in topic configs to denote an infinite
// retention duration (i.e. retain messages as long as there is available
// storage).
const InfiniteRetention = time.Duration(-1)

// TopicConfig describes the properties of a Google Pub/Sub Lite topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how topics are configured.
Expand All @@ -33,55 +38,52 @@ type TopicConfig struct {

// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
PartitionCount int64
PartitionCount int

// Publish throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 16.
PublishCapacityMiBPerSec int32
PublishCapacityMiBPerSec int

// Subscribe throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 32.
SubscribeCapacityMiBPerSec int32
SubscribeCapacityMiBPerSec int

// The provisioned storage, in bytes, per partition. If the number of bytes
// stored in any of the topic's partitions grows beyond this value, older
// messages will be dropped to make room for newer ones, regardless of the
// value of `RetentionDuration`.
// value of `RetentionDuration`. Must be > 0.
PerPartitionBytes int64

// How long a published message is retained. If unset, messages will be
// retained as long as the bytes retained for each partition is below
// `PerPartitionBytes`.
RetentionDuration optional.Duration
// How long a published message is retained. If set to `InfiniteRetention`,
// messages will be retained as long as the bytes retained for each partition
// is below `PerPartitionBytes`. Otherwise, must be > 0.
RetentionDuration time.Duration
}

func (tc *TopicConfig) toProto() *pb.Topic {
topicpb := &pb.Topic{
Name: tc.Name.String(),
PartitionConfig: &pb.Topic_PartitionConfig{
Count: tc.PartitionCount,
Count: int64(tc.PartitionCount),
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
RetentionConfig: &pb.Topic_RetentionConfig{
PerPartitionBytes: tc.PerPartitionBytes,
},
}
if tc.RetentionDuration != nil {
duration := optional.ToDuration(tc.RetentionDuration)
if duration >= 0 {
topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
}
if tc.RetentionDuration >= 0 {
topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration)
}
return topicpb
}

func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
name, err := ParseTopicPath(t.GetName())
name, err := parseTopicPath(t.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
}
Expand All @@ -90,10 +92,11 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
retentionCfg := t.GetRetentionConfig()
topic := &TopicConfig{
Name: name,
PartitionCount: partitionCfg.GetCount(),
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
PartitionCount: int(partitionCfg.GetCount()),
PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()),
SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()),
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
RetentionDuration: InfiniteRetention,
}
// An unset retention period proto denotes "infinite retention".
if retentionCfg.Period != nil {
Expand All @@ -106,28 +109,23 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
return topic, nil
}

// InfiniteRetention is sentinel used when updating topic configs to clear a
// retention duration (i.e. retain messages as long as there is available
// storage).
const InfiniteRetention = time.Duration(-1)

// TopicConfigToUpdate specifies the properties to update for a topic.
type TopicConfigToUpdate struct {
// The full path of the topic to update. Required.
Name TopicPath

// If non-zero, will update the publish throughput capacity per partition.
PublishCapacityMiBPerSec int32
PublishCapacityMiBPerSec int

// If non-zero, will update the subscribe throughput capacity per partition.
SubscribeCapacityMiBPerSec int32
SubscribeCapacityMiBPerSec int

// If non-zero, will update the provisioned storage per partition.
PerPartitionBytes int64

// If specified, will update how long a published message is retained. To
// clear a retention duration (i.e. retain messages as long as there is
// available storage), set this to `pubsublite.InfiniteRetention`.
// available storage), set this to `InfiniteRetention`.
RetentionDuration optional.Duration
}

Expand All @@ -137,8 +135,8 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
PartitionConfig: &pb.Topic_PartitionConfig{
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
Expand Down Expand Up @@ -219,11 +217,11 @@ func (sc *SubscriptionConfig) toProto() *pb.Subscription {
}

func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
name, err := ParseSubscriptionPath(s.GetName())
name, err := parseSubscriptionPath(s.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
}
topic, err := ParseTopicPath(s.GetTopic())
topic, err := parseTopicPath(s.GetTopic())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
}
Expand Down
1 change: 1 addition & 0 deletions pubsublite/config_test.go
Expand Up @@ -92,6 +92,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) {
PublishCapacityMiBPerSec: 4,
SubscribeCapacityMiBPerSec: 8,
PerPartitionBytes: 4294967296,
RetentionDuration: InfiniteRetention,
},
},
{
Expand Down

0 comments on commit 749473e

Please sign in to comment.