Skip to content

Commit

Permalink
fix(pubsublite)!: add separate publisher and subscriber client constr…
Browse files Browse the repository at this point in the history
…uctors with settings (api review) (#3528)

NewPublisherClient and NewSubscriberClient constructors now use default settings. Separate constructors were added for users to provide custom settings.
  • Loading branch information
tmdiep committed Jan 27, 2021
1 parent e663d6c commit 98637e0
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 23 deletions.
6 changes: 3 additions & 3 deletions pubsublite/README.md
Expand Up @@ -24,8 +24,8 @@ To publish messages to a topic:
```go
// Create a PublisherClient for topic1 in zone us-central1-b.
// See https://cloud.google.com/pubsub/lite/docs/locations for available zones.
const topic = "projects/project-id/locations/us-central1-b/topics/topic1",
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
log.Fatal(err)
}
Expand All @@ -49,7 +49,7 @@ To receive messages for a subscription:
```go
// Create a SubscriberClient for subscription1 in zone us-central1-b.
const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1"
subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription)
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/doc.go
Expand Up @@ -77,7 +77,7 @@ differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.
To publish messages to a topic, first create a PublisherClient:
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topicPath)
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
// TODO: Handle error.
}
Expand Down Expand Up @@ -136,7 +136,7 @@ Receiving
To receive messages for a subscription, first create a SubscriberClient:
subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscriptionPath)
subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)
Messages are then consumed from a subscription via callback.
Expand Down
43 changes: 37 additions & 6 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -16,6 +16,7 @@ package pscompat_test
import (
"context"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
Expand All @@ -24,8 +25,39 @@ import (
func ExamplePublisherClient_Publish() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
// NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent.
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

// This example illustrates how to set batching settings for publishing. Note
// that batching settings apply per partition. If BufferedByteLimit is being
// used to bound memory usage, keep in mind the number of partitions in the
// topic.
func ExamplePublisherClient_Publish_batchingSettings() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.DefaultPublishSettings
settings.DelayThreshold = 50 * time.Millisecond
settings.CountThreshold = 200
settings.BufferedByteLimit = 5e8
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
}
Expand All @@ -49,7 +81,7 @@ func ExamplePublisherClient_Publish() {
func ExamplePublisherClient_Error() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
// TODO: Handle error.
}
Expand All @@ -76,8 +108,7 @@ func ExamplePublisherClient_Error() {
func ExampleSubscriberClient_Receive() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
// NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent.
subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription)
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
// TODO: Handle error.
}
Expand Down Expand Up @@ -106,7 +137,7 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
settings := pscompat.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
subscriber, err := pscompat.NewSubscriberClient(ctx, settings, subscription)
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -81,7 +81,7 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio

func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings, topic wire.TopicPath, opts ...option.ClientOption) *PublisherClient {
opts = testOptions(ctx, t, opts...)
pub, err := NewPublisherClient(ctx, settings, topic.String(), opts...)
pub, err := NewPublisherClientWithSettings(ctx, topic.String(), settings, opts...)
if err != nil {
t.Fatalf("Failed to create publisher client: %v", err)
}
Expand All @@ -90,7 +90,7 @@ func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings

func subscriberClient(ctx context.Context, t *testing.T, settings ReceiveSettings, subscription wire.SubscriptionPath, opts ...option.ClientOption) *SubscriberClient {
opts = testOptions(ctx, t, opts...)
sub, err := NewSubscriberClient(ctx, settings, subscription.String(), opts...)
sub, err := NewSubscriberClientWithSettings(ctx, subscription.String(), settings, opts...)
if err != nil {
t.Fatalf("Failed to create publisher client: %v", err)
}
Expand Down
15 changes: 11 additions & 4 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -68,10 +68,17 @@ type PublisherClient struct {
err error
}

// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to a
// given topic. A valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to
// a given topic, using DefaultPublishSettings. A valid topic path has the
// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}

// NewPublisherClientWithSettings creates a new Pub/Sub Lite client to publish
// messages to a given topic, using the specified PublishSettings. A valid topic
// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pubsublite/pscompat/settings.go
Expand Up @@ -63,8 +63,8 @@ type PublishSettings struct {
// DefaultPublishSettings.ByteThreshold. Otherwise must be > 0.
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. If Timeout is 0, it will be treated as
// The maximum time that the client will attempt to open a publish stream
// to the server. If Timeout is 0, it will be treated as
// DefaultPublishSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
Expand Down Expand Up @@ -170,8 +170,8 @@ type ReceiveSettings struct {
// the associated topic.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. If Timeout is 0, it will be treated as
// The maximum time that the client will attempt to open a subscribe stream
// to the server. If Timeout is 0, it will be treated as
// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the SubscriberClient will terminate with the last
Expand Down
13 changes: 11 additions & 2 deletions pubsublite/pscompat/subscriber.go
Expand Up @@ -234,9 +234,18 @@ type SubscriberClient struct {
}

// NewSubscriberClient creates a new Pub/Sub Lite client to receive messages for
// a given subscription. A valid subscription path has the format:
// a given subscription, using DefaultReceiveSettings. A valid subscription path
// has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func NewSubscriberClient(ctx context.Context, settings ReceiveSettings, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) {
func NewSubscriberClient(ctx context.Context, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) {
return NewSubscriberClientWithSettings(ctx, subscription, DefaultReceiveSettings, opts...)
}

// NewSubscriberClientWithSettings creates a new Pub/Sub Lite client to receive
// messages for a given subscription, using the specified ReceiveSettings. A
// valid subscription path has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func NewSubscriberClientWithSettings(ctx context.Context, subscription string, settings ReceiveSettings, opts ...option.ClientOption) (*SubscriberClient, error) {
subscriptionPath, err := wire.ParseSubscriptionPath(subscription)
if err != nil {
return nil, err
Expand Down

0 comments on commit 98637e0

Please sign in to comment.