Skip to content

Commit

Permalink
fix(pubsublite): return an error if no topic or subscription fields w…
Browse files Browse the repository at this point in the history
…ere updated (#3502)

For consistency with pubsub implementation.
  • Loading branch information
tmdiep committed Jan 7, 2021
1 parent 8226811 commit a875969
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
29 changes: 23 additions & 6 deletions pubsublite/admin.go
Expand Up @@ -15,6 +15,7 @@ package pubsublite

import (
"context"
"errors"

"cloud.google.com/go/pubsublite/internal/wire"
"google.golang.org/api/option"
Expand All @@ -23,6 +24,11 @@ import (
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
errNoTopicFieldsUpdated = errors.New("pubsublite: no fields updated for topic")
errNoSubscriptionFieldsUpdated = errors.New("pubsublite: no fields updated for subscription")
)

// AdminClient provides admin operations for Cloud Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
Expand All @@ -45,7 +51,8 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt
return &AdminClient{admin: admin}, nil
}

// CreateTopic creates a new topic from the given config.
// CreateTopic creates a new topic from the given config. If the topic already
// exists an error will be returned.
func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
req := &pb.CreateTopicRequest{
Parent: config.Name.location().String(),
Expand All @@ -60,9 +67,13 @@ func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*To
}

// UpdateTopic updates an existing topic from the given config and returns the
// new topic config.
// new topic config. UpdateTopic returns an error if no fields were modified.
func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest())
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoTopicFieldsUpdated
}
topicpb, err := ac.admin.UpdateTopic(ctx, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -106,7 +117,8 @@ func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *Topic
}
}

// CreateSubscription creates a new subscription from the given config.
// CreateSubscription creates a new subscription from the given config. If the
// subscription already exists an error will be returned.
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
req := &pb.CreateSubscriptionRequest{
Parent: config.Name.location().String(),
Expand All @@ -121,9 +133,14 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
}

// UpdateSubscription updates an existing subscription from the given config and
// returns the new subscription config.
// returns the new subscription config. UpdateSubscription returns an error if
// no fields were modified.
func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest())
req := config.toUpdateRequest()
if len(req.GetUpdateMask().GetPaths()) == 0 {
return nil, errNoSubscriptionFieldsUpdated
}
subspb, err := ac.admin.UpdateSubscription(ctx, req)
if err != nil {
return nil, err
}
Expand Down
8 changes: 8 additions & 0 deletions pubsublite/admin_test.go
Expand Up @@ -95,6 +95,10 @@ func TestAdminTopicCRUD(t *testing.T) {
t.Errorf("UpdateTopic() got: %v\nwant: %v", gotConfig, topicConfig)
}

if _, err := admin.UpdateTopic(ctx, TopicConfigToUpdate{}); !test.ErrorEqual(err, errNoTopicFieldsUpdated) {
t.Errorf("UpdateTopic() got err: (%v), want err: (%v)", err, errNoTopicFieldsUpdated)
}

if gotConfig, err := admin.Topic(ctx, topicPath); err != nil {
t.Errorf("Topic() got err: %v", err)
} else if !testutil.Equal(gotConfig, &topicConfig) {
Expand Down Expand Up @@ -296,6 +300,10 @@ func TestAdminSubscriptionCRUD(t *testing.T) {
t.Errorf("UpdateSubscription() got: %v\nwant: %v", gotConfig, subscriptionConfig)
}

if _, err := admin.UpdateSubscription(ctx, SubscriptionConfigToUpdate{}); !test.ErrorEqual(err, errNoSubscriptionFieldsUpdated) {
t.Errorf("UpdateSubscription() got err: (%v), want err: (%v)", err, errNoSubscriptionFieldsUpdated)
}

if gotConfig, err := admin.Subscription(ctx, subscriptionPath); err != nil {
t.Errorf("Subscription() got err: %v", err)
} else if !testutil.Equal(gotConfig, &subscriptionConfig) {
Expand Down

0 comments on commit a875969

Please sign in to comment.