Skip to content

Commit

Permalink
fix(pubsublite)!: use strings for resource paths (api review) (#3559)
Browse files Browse the repository at this point in the history
The public API now only accepts resource paths as strings. Structs moved to internal.
  • Loading branch information
tmdiep committed Jan 14, 2021
1 parent 3bac8dd commit c18ed25
Show file tree
Hide file tree
Showing 16 changed files with 749 additions and 680 deletions.
17 changes: 4 additions & 13 deletions pubsublite/README.md
Expand Up @@ -14,7 +14,6 @@
```go
import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/ps"
)
```
Expand All @@ -23,13 +22,9 @@ To publish messages to a topic:

[snip]:# (publish)
```go
// Create a PublisherClient for topic1.
// Create a PublisherClient for topic1 in zone us-central1-b.
// See https://cloud.google.com/pubsub/lite/docs/locations for available zones.
topic := pubsublite.TopicPath{
Project: "project-id",
Zone: "us-central1-b",
TopicID: "topic1",
}
const topic = "projects/project-id/locations/us-central1-b/topics/topic1",
publisher, err := ps.NewPublisherClient(ctx, ps.DefaultPublishSettings, topic)
if err != nil {
log.Fatal(err)
Expand All @@ -52,12 +47,8 @@ To receive messages for a subscription:

[snip]:# (subscribe)
```go
// Create a SubscriberClient for subscription1.
subscription := pubsublite.SubscriptionPath{
Project: "project-id",
Zone: "us-central1-b",
SubscriptionID: "subscription1",
}
// Create a SubscriberClient for subscription1 in zone us-central1-b.
const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1"
subscriber, err := ps.NewSubscriberClient(ctx, ps.DefaultReceiveSettings, subscription)
if err != nil {
log.Fatal(err)
Expand Down
156 changes: 111 additions & 45 deletions pubsublite/admin.go
Expand Up @@ -29,17 +29,18 @@ var (
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// AdminClient provides admin operations for Cloud Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
// AdminClient provides admin operations for Pub/Sub Lite resources within a
// Google Cloud region. The zone component of resource paths must be within this
// region. See https://cloud.google.com/pubsub/lite/docs/locations for the list
// of zones where Pub/Sub Lite is available.
//
// An AdminClient may be shared by multiple goroutines.
type AdminClient struct {
admin *vkit.AdminClient
}

// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
// operations for resources within a given region.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Cloud Pub/Sub Lite is available.
// NewAdminClient creates a new Pub/Sub Lite client to perform admin operations
// for resources within a given region.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := wire.ValidateRegion(region); err != nil {
return nil, err
Expand All @@ -54,10 +55,14 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt
// CreateTopic creates a new topic from the given config. If the topic already
// exists an error will be returned.
func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
topicPath, err := wire.ParseTopicPath(config.Name)
if err != nil {
return nil, err
}
req := &pb.CreateTopicRequest{
Parent: config.Name.location().String(),
Parent: topicPath.Location().String(),
Topic: config.toProto(),
TopicId: config.Name.TopicID,
TopicId: topicPath.TopicID,
}
topicpb, err := ac.admin.CreateTopic(ctx, req)
if err != nil {
Expand All @@ -69,6 +74,9 @@ func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*To
// UpdateTopic updates an existing topic from the given config and returns the
// new topic config. UpdateTopic returns an error if no fields were modified.
func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
if _, err := wire.ParseTopicPath(config.Name); err != nil {
return nil, err
}
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoTopicFieldsUpdated
Expand All @@ -80,62 +88,93 @@ func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpda
return protoToTopicConfig(topicpb)
}

// DeleteTopic deletes a topic.
func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error {
return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()})
// DeleteTopic deletes a topic. A valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func (ac *AdminClient) DeleteTopic(ctx context.Context, topic string) error {
if _, err := wire.ParseTopicPath(topic); err != nil {
return err
}
return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic})
}

// Topic retrieves the configuration of a topic.
func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) {
topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()})
// Topic retrieves the configuration of a topic. A valid topic path has the
// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func (ac *AdminClient) Topic(ctx context.Context, topic string) (*TopicConfig, error) {
if _, err := wire.ParseTopicPath(topic); err != nil {
return nil, err
}
topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic})
if err != nil {
return nil, err
}
return protoToTopicConfig(topicpb)
}

// TopicPartitions returns the number of partitions for a topic.
func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
// TopicPartitions returns the number of partitions for a topic. A valid topic
// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func (ac *AdminClient) TopicPartitions(ctx context.Context, topic string) (int, error) {
if _, err := wire.ParseTopicPath(topic); err != nil {
return 0, err
}
partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic})
if err != nil {
return 0, err
}
return int(partitions.GetPartitionCount()), nil
}

// TopicSubscriptions retrieves the list of subscription paths for a topic.
func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) *SubscriptionPathIterator {
// A valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic string) *SubscriptionPathIterator {
if _, err := wire.ParseTopicPath(topic); err != nil {
return &SubscriptionPathIterator{err: err}
}
return &SubscriptionPathIterator{
it: ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()}),
it: ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic}),
}
}

// Topics retrieves the list of topic configs for a given project and zone.
func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator {
// A valid parent path has the format: "projects/PROJECT_ID/locations/ZONE".
func (ac *AdminClient) Topics(ctx context.Context, parent string) *TopicIterator {
if _, err := wire.ParseLocationPath(parent); err != nil {
return &TopicIterator{err: err}
}
return &TopicIterator{
it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}),
it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: parent}),
}
}

// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
subsPath, err := wire.ParseSubscriptionPath(config.Name)
if err != nil {
return nil, err
}
if _, err := wire.ParseTopicPath(config.Topic); err != nil {
return nil, err
}
req := &pb.CreateSubscriptionRequest{
Parent: config.Name.location().String(),
Parent: subsPath.Location().String(),
Subscription: config.toProto(),
SubscriptionId: config.Name.SubscriptionID,
SubscriptionId: subsPath.SubscriptionID,
}
subspb, err := ac.admin.CreateSubscription(ctx, req)
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
return protoToSubscriptionConfig(subspb), nil
}

// UpdateSubscription updates an existing subscription from the given config and
// returns the new subscription config. UpdateSubscription returns an error if
// no fields were modified.
func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
if _, err := wire.ParseSubscriptionPath(config.Name); err != nil {
return nil, err
}
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoSubscriptionFieldsUpdated
Expand All @@ -144,28 +183,41 @@ func (ac *AdminClient) UpdateSubscription(ctx context.Context, config Subscripti
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
return protoToSubscriptionConfig(subspb), nil
}

// DeleteSubscription deletes a subscription.
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error {
return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()})
// DeleteSubscription deletes a subscription. A valid subscription path has the
// format: "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription string) error {
if _, err := wire.ParseSubscriptionPath(subscription); err != nil {
return err
}
return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription})
}

// Subscription retrieves the configuration of a subscription.
func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) {
subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()})
// Subscription retrieves the configuration of a subscription. A valid
// subscription name has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func (ac *AdminClient) Subscription(ctx context.Context, subscription string) (*SubscriptionConfig, error) {
if _, err := wire.ParseSubscriptionPath(subscription); err != nil {
return nil, err
}
subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription})
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
return protoToSubscriptionConfig(subspb), nil
}

// Subscriptions retrieves the list of subscription configs for a given project
// and zone.
func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator {
// and zone. A valid parent path has the format:
// "projects/PROJECT_ID/locations/ZONE".
func (ac *AdminClient) Subscriptions(ctx context.Context, parent string) *SubscriptionIterator {
if _, err := wire.ParseLocationPath(parent); err != nil {
return &SubscriptionIterator{err: err}
}
return &SubscriptionIterator{
it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}),
it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: parent}),
}
}

Expand All @@ -178,12 +230,16 @@ func (ac *AdminClient) Close() error {

// TopicIterator is an iterator that returns a list of topic configs.
type TopicIterator struct {
it *vkit.TopicIterator
it *vkit.TopicIterator
err error
}

// Next returns the next topic config. The second return value will be
// iterator.Done if there are no more topic configs.
func (t *TopicIterator) Next() (*TopicConfig, error) {
if t.err != nil {
return nil, t.err
}
topicpb, err := t.it.Next()
if err != nil {
return nil, err
Expand All @@ -194,31 +250,41 @@ func (t *TopicIterator) Next() (*TopicConfig, error) {
// SubscriptionIterator is an iterator that returns a list of subscription
// configs.
type SubscriptionIterator struct {
it *vkit.SubscriptionIterator
it *vkit.SubscriptionIterator
err error
}

// Next returns the next subscription config. The second return value will be
// iterator.Done if there are no more subscription configs.
func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
if s.err != nil {
return nil, s.err
}
subspb, err := s.it.Next()
if err != nil {
return nil, err
}
return protoToSubscriptionConfig(subspb)
return protoToSubscriptionConfig(subspb), nil
}

// SubscriptionPathIterator is an iterator that returns a list of subscription
// paths.
type SubscriptionPathIterator struct {
it *vkit.StringIterator
it *vkit.StringIterator
err error
}

// Next returns the next subscription path. The second return value will be
// iterator.Done if there are no more subscription paths.
func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
// Next returns the next subscription path, which has format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". The
// second return value will be iterator.Done if there are no more subscription
// paths.
func (sp *SubscriptionPathIterator) Next() (string, error) {
if sp.err != nil {
return "", sp.err
}
subsPath, err := sp.it.Next()
if err != nil {
return SubscriptionPath{}, err
return "", err
}
return parseSubscriptionPath(subsPath)
return subsPath, nil
}

0 comments on commit c18ed25

Please sign in to comment.