Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite)!: add separate publisher and subscriber client constructors with settings (api review) #3528

Merged
merged 10 commits into from Jan 27, 2021
6 changes: 3 additions & 3 deletions pubsublite/README.md
Expand Up @@ -24,8 +24,8 @@ To publish messages to a topic:
```go
// Create a PublisherClient for topic1 in zone us-central1-b.
// See https://cloud.google.com/pubsub/lite/docs/locations for available zones.
const topic = "projects/project-id/locations/us-central1-b/topics/topic1",
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
log.Fatal(err)
}
Expand All @@ -49,7 +49,7 @@ To receive messages for a subscription:
```go
// Create a SubscriberClient for subscription1 in zone us-central1-b.
const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1"
subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription)
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
log.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/doc.go
Expand Up @@ -77,7 +77,7 @@ differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.

To publish messages to a topic, first create a PublisherClient:

publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topicPath)
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
// TODO: Handle error.
}
Expand Down Expand Up @@ -136,7 +136,7 @@ Receiving

To receive messages for a subscription, first create a SubscriberClient:

subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscriptionPath)
subscriber, err := pscompat.NewSubscriberClient(ctx, ubscriptionPath)

Messages are then consumed from a subscription via callback.

Expand Down
43 changes: 37 additions & 6 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -16,6 +16,7 @@ package pscompat_test
import (
"context"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
Expand All @@ -24,8 +25,39 @@ import (
func ExamplePublisherClient_Publish() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
// NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent.
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
// TODO: Handle error.
}
defer publisher.Stop()

var results []*pubsub.PublishResult
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
}
fmt.Printf("Published a message with a message ID: %s\n", id)
}
}

// This example illustrates how to set batching settings for publishing. Note
// that batching settings apply per partition. If BufferedByteLimit is being
// used to bound memory usage, keep in mind the number of partitions in the
// topic.
func ExamplePublisherClient_Publish_batchingSettings() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
settings := pscompat.DefaultPublishSettings
settings.DelayThreshold = 50 * time.Millisecond
settings.CountThreshold = 200
settings.BufferedByteLimit = 5e8
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings)
if err != nil {
// TODO: Handle error.
}
Expand All @@ -49,7 +81,7 @@ func ExamplePublisherClient_Publish() {
func ExamplePublisherClient_Error() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic)
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
// TODO: Handle error.
}
Expand All @@ -76,8 +108,7 @@ func ExamplePublisherClient_Error() {
func ExampleSubscriberClient_Receive() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
// NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent.
subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription)
subscriber, err := pscompat.NewSubscriberClient(ctx, subscription)
if err != nil {
// TODO: Handle error.
}
Expand Down Expand Up @@ -106,7 +137,7 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
settings := pscompat.DefaultReceiveSettings
settings.MaxOutstandingMessages = 5
settings.MaxOutstandingBytes = 10e6
subscriber, err := pscompat.NewSubscriberClient(ctx, settings, subscription)
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}
Expand Down
4 changes: 2 additions & 2 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -81,7 +81,7 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio

func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings, topic wire.TopicPath, opts ...option.ClientOption) *PublisherClient {
opts = testOptions(ctx, t, opts...)
pub, err := NewPublisherClient(ctx, settings, topic.String(), opts...)
pub, err := NewPublisherClientWithSettings(ctx, topic.String(), settings, opts...)
if err != nil {
t.Fatalf("Failed to create publisher client: %v", err)
}
Expand All @@ -90,7 +90,7 @@ func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings

func subscriberClient(ctx context.Context, t *testing.T, settings ReceiveSettings, subscription wire.SubscriptionPath, opts ...option.ClientOption) *SubscriberClient {
opts = testOptions(ctx, t, opts...)
sub, err := NewSubscriberClient(ctx, settings, subscription.String(), opts...)
sub, err := NewSubscriberClientWithSettings(ctx, subscription.String(), settings, opts...)
if err != nil {
t.Fatalf("Failed to create publisher client: %v", err)
}
Expand Down
15 changes: 11 additions & 4 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -68,10 +68,17 @@ type PublisherClient struct {
err error
}

// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to a
// given topic. A valid topic path has the format:
// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, settings PublishSettings, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to
// a given topic, using DefaultPublishSettings. A valid topic path has the
// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) {
return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...)
}

// NewPublisherClientWithSettings creates a new Pub/Sub Lite client to publish
// messages to a given topic, using the specified PublishSettings. A valid topic
// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID".
func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) {
topicPath, err := wire.ParseTopicPath(topic)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pubsublite/pscompat/settings.go
Expand Up @@ -63,8 +63,8 @@ type PublishSettings struct {
// DefaultPublishSettings.ByteThreshold. Otherwise must be > 0.
ByteThreshold int

// The maximum time that the client will attempt to establish a publish stream
// connection to the server. If Timeout is 0, it will be treated as
// The maximum time that the client will attempt to open a publish stream
// to the server. If Timeout is 0, it will be treated as
// DefaultPublishSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the publisher will terminate with the last error
Expand Down Expand Up @@ -170,8 +170,8 @@ type ReceiveSettings struct {
// the associated topic.
MaxOutstandingBytes int

// The maximum time that the client will attempt to establish a subscribe
// stream connection to the server. If Timeout is 0, it will be treated as
// The maximum time that the client will attempt to open a subscribe stream
// to the server. If Timeout is 0, it will be treated as
// DefaultReceiveSettings.Timeout. Otherwise must be > 0.
//
// The timeout is exceeded, the SubscriberClient will terminate with the last
Expand Down
13 changes: 11 additions & 2 deletions pubsublite/pscompat/subscriber.go
Expand Up @@ -234,9 +234,18 @@ type SubscriberClient struct {
}

// NewSubscriberClient creates a new Pub/Sub Lite client to receive messages for
// a given subscription. A valid subscription path has the format:
// a given subscription, using DefaultReceiveSettings. A valid subscription path
// has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func NewSubscriberClient(ctx context.Context, settings ReceiveSettings, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) {
func NewSubscriberClient(ctx context.Context, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) {
return NewSubscriberClientWithSettings(ctx, subscription, DefaultReceiveSettings, opts...)
}

// NewSubscriberClientWithSettings creates a new Pub/Sub Lite client to receive
// messages for a given subscription, using the specified ReceiveSettings. A
// valid subscription path has the format:
// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID".
func NewSubscriberClientWithSettings(ctx context.Context, subscription string, settings ReceiveSettings, opts ...option.ClientOption) (*SubscriberClient, error) {
subscriptionPath, err := wire.ParseSubscriptionPath(subscription)
if err != nil {
return nil, err
Expand Down