Skip to content

Commit

Permalink
feat(pubsub): add list configs for topic & sub (#4607)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Sep 8, 2021
1 parent fc668f6 commit a6550c5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 3 deletions.
22 changes: 21 additions & 1 deletion pubsub/subscription.go
Expand Up @@ -33,6 +33,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
durpb "google.golang.org/protobuf/types/known/durationpb"

vkit "cloud.google.com/go/pubsub/apiv1"
)

// Subscription is a reference to a PubSub subscription.
Expand Down Expand Up @@ -85,7 +87,8 @@ func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
Project: c.fullyQualifiedProjectName(),
})
return &SubscriptionIterator{
c: c,
c: c,
it: it,
next: func() (string, error) {
sub, err := it.Next()
if err != nil {
Expand All @@ -99,6 +102,7 @@ func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
// SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
c *Client
it *vkit.SubscriptionIterator
next func() (string, error)
}

Expand All @@ -111,6 +115,22 @@ func (subs *SubscriptionIterator) Next() (*Subscription, error) {
return &Subscription{c: subs.c, name: subName}, nil
}

// NextConfig returns the next subscription config. If there are no more subscriptions,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `SubscriptionIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) {
spb, err := subs.it.Next()
if err != nil {
return nil, err
}
cfg, err := protoToSubscriptionConfig(spb, subs.c)
if err != nil {
return nil, err
}
return &cfg, nil
}

// PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct {
// A URL locating the endpoint to which messages should be pushed.
Expand Down
15 changes: 15 additions & 0 deletions pubsub/subscription_test.go
Expand Up @@ -81,6 +81,21 @@ func TestListProjectSubscriptions(t *testing.T) {
if !testutil.Equal(got, want) {
t.Errorf("got %v, want %v", got, want)
}

// Call list again, but check the config this time.
it := c.Subscriptions(ctx)
for {
sub, err := it.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("SubscriptionIterator.NextConfig() got err: %v", err)
}
if got := sub.Topic.ID(); got != topic.ID() {
t.Errorf("subConfig.Topic mismatch, got: %v, want: %v", got, topic.ID())
}
}
}

func getSubIDs(subs []*Subscription) []string {
Expand Down
18 changes: 17 additions & 1 deletion pubsub/topic.go
Expand Up @@ -27,6 +27,7 @@ import (
"cloud.google.com/go/iam"
"cloud.google.com/go/internal/optional"
ipubsub "cloud.google.com/go/internal/pubsub"
vkit "cloud.google.com/go/pubsub/apiv1"
"cloud.google.com/go/pubsub/internal/scheduler"
gax "github.com/googleapis/gax-go/v2"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -381,7 +382,8 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
func (c *Client) Topics(ctx context.Context) *TopicIterator {
it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
return &TopicIterator{
c: c,
c: c,
it: it,
next: func() (string, error) {
topic, err := it.Next()
if err != nil {
Expand All @@ -395,6 +397,7 @@ func (c *Client) Topics(ctx context.Context) *TopicIterator {
// TopicIterator is an iterator that returns a series of topics.
type TopicIterator struct {
c *Client
it *vkit.TopicIterator
next func() (string, error)
}

Expand All @@ -407,6 +410,19 @@ func (tps *TopicIterator) Next() (*Topic, error) {
return newTopic(tps.c, topicName), nil
}

// NextConfig returns the next topic config. If there are no more topics,
// iterator.Done will be returned.
// This call shares the underlying iterator with calls to `TopicIterator.Next`.
// If you wish to use mix calls, create separate iterator instances for both.
func (t *TopicIterator) NextConfig() (*TopicConfig, error) {
tpb, err := t.it.Next()
if err != nil {
return nil, err
}
cfg := protoToTopicConfig(tpb)
return &cfg, nil
}

// ID returns the unique identifier of the topic within its project.
func (t *Topic) ID() string {
slash := strings.LastIndex(t.name, "/")
Expand Down
21 changes: 20 additions & 1 deletion pubsub/topic_test.go
Expand Up @@ -106,17 +106,36 @@ func TestCreateTopicWithConfig(t *testing.T) {
}

func TestListTopics(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

var ids []string
for i := 1; i <= 4; i++ {
numTopics := 4
for i := 1; i <= numTopics; i++ {
id := fmt.Sprintf("t%d", i)
ids = append(ids, id)
mustCreateTopic(t, c, id)
}
checkTopicListing(t, c, ids)

var tt []*TopicConfig
it := c.Topics(ctx)
for {
topic, err := it.NextConfig()
if err == iterator.Done {
break
}
if err != nil {
t.Errorf("TopicIterator.NextConfig() got err: %v", err)
} else {
tt = append(tt, topic)
}
}
if got := len(tt); got != numTopics {
t.Errorf("c.Topics(ctx) returned %d topics, expected %d", got, numTopics)
}
}

func TestListCompletelyEmptyTopics(t *testing.T) {
Expand Down

0 comments on commit a6550c5

Please sign in to comment.