From 5b5d0f782b224f324dcfa13cc4145ee33a395d09 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Wed, 23 Jun 2021 09:46:23 +1000 Subject: [PATCH] fix(pubsublite): make SubscriberClient.Receive identical to pubsub (#4281) - Ensures that pscompat.SubscriberClient.Receive and pubsub.Subscription.Receive have identical interfaces. - Adds examples for how to declare common interfaces for pubsublite and pubsub. --- pubsublite/doc.go | 36 +++++++++------ pubsublite/internal/wire/README.md | 3 +- pubsublite/pscompat/doc.go | 8 +++- pubsublite/pscompat/example_test.go | 64 ++++++++++++++++++++++++++ pubsublite/pscompat/subscriber.go | 31 +++++-------- pubsublite/pscompat/subscriber_test.go | 2 +- 6 files changed, 106 insertions(+), 38 deletions(-) diff --git a/pubsublite/doc.go b/pubsublite/doc.go index 07abbea798d..3ec736d8adc 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -29,14 +29,30 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. More information about Pub/Sub Lite is available at https://cloud.google.com/pubsub/lite. +See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, +connection pooling and similar aspects of this package. + Note: This library is in BETA. Backwards-incompatible changes may be made before stable v1.0.0 is released. Introduction -See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, -connection pooling and similar aspects of this package. +Examples can be found at +https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples +and +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples. + +Complete sample programs can be found at +https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite. + +The cloud.google.com/go/pubsublite/pscompat subpackage contains clients for +publishing and receiving messages, which have similar interfaces to their +pubsub.Topic and pubsub.Subscription counterparts in cloud.google.com/go/pubsub. +The following examples demonstrate how to declare common interfaces: +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface +and +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface. The following imports are required for code snippets below: @@ -46,11 +62,6 @@ The following imports are required for code snippets below: "cloud.google.com/go/pubsublite/pscompat" ) -More complete examples can be found at -https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples -and -https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples. - Creating Topics @@ -83,11 +94,6 @@ where Pub/Sub Lite is available. Publishing -The pubsublite/pscompat subpackage contains clients for publishing and receiving -messages, which have similar interfaces to their pubsub.Topic and -pubsub.Subscription counterparts in the Cloud Pub/Sub library: -https://pkg.go.dev/cloud.google.com/go/pubsub. - Pub/Sub Lite uses gRPC streams extensively for high throughput. For more differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat. @@ -118,7 +124,7 @@ service: Once you've finishing publishing all messages, call Stop to flush all messages to the service and close gRPC streams. The PublisherClient can no longer be used -after it has been stopped or has terminated due to a permanent service error. +after it has been stopped or has terminated due to a permanent error. publisher.Stop() @@ -167,8 +173,8 @@ subscriber client is connected to). // TODO: Handle error. } -Receive blocks until either the context is canceled or a fatal service error -occurs. To terminate a call to Receive, cancel its context: +Receive blocks until either the context is canceled or a permanent error occurs. +To terminate a call to Receive, cancel its context: cancel() diff --git a/pubsublite/internal/wire/README.md b/pubsublite/internal/wire/README.md index 133e1fed5b3..bd1774805fa 100644 --- a/pubsublite/internal/wire/README.md +++ b/pubsublite/internal/wire/README.md @@ -1,7 +1,6 @@ # Wire -This directory contains internal implementation details for Cloud Pub/Sub Lite. -Its exported interface can change at any time. +This directory contains internal implementation details for Pub/Sub Lite. ## Conventions diff --git a/pubsublite/pscompat/doc.go b/pubsublite/pscompat/doc.go index 87b68b1b449..deb6f15d721 100644 --- a/pubsublite/pscompat/doc.go +++ b/pubsublite/pscompat/doc.go @@ -19,7 +19,10 @@ This package is designed to compatible with the Cloud Pub/Sub library: https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the client application, PublisherClient and SubscriberClient can be used as substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(), -respectively, from the pubsub package. +respectively, from the pubsub package. See the following examples: +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface +and +https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface. The Cloud Pub/Sub and Pub/Sub Lite services have some differences: - Pub/Sub Lite does not support NACK for messages. By default, this will @@ -42,6 +45,9 @@ https://cloud.google.com/pubsub/lite. Information about choosing between Cloud Pub/Sub vs Pub/Sub Lite is available at https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite. +Complete sample programs can be found at +https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite. + See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package. */ diff --git a/pubsublite/pscompat/example_test.go b/pubsublite/pscompat/example_test.go index 7ca12e680ad..7dc32ca3b93 100644 --- a/pubsublite/pscompat/example_test.go +++ b/pubsublite/pscompat/example_test.go @@ -292,6 +292,70 @@ func ExampleSubscriberClient_Receive_manualPartitionAssignment() { cancel() } +// This example illustrates how to declare a common interface for publisher +// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite +// (cloud.google.com/go/pubsublite/pscompat). +func ExampleNewPublisherClient_interface() { + // publisherInterface is implemented by both pscompat.PublisherClient and + // pubsub.Topic. + type publisherInterface interface { + Publish(context.Context, *pubsub.Message) *pubsub.PublishResult + Stop() + } + + publish := func(publisher publisherInterface) { + defer publisher.Stop() + // TODO: Publish messages. + } + + // Create a Pub/Sub Lite publisher client. + ctx := context.Background() + publisher, err := pscompat.NewPublisherClient(ctx, "projects/my-project/locations/zone/topics/my-topic") + if err != nil { + // TODO: Handle error. + } + publish(publisher) + + // Create a Cloud Pub/Sub topic to publish. + client, err := pubsub.NewClient(ctx, "my-project") + if err != nil { + // TODO: Handle error. + } + topic := client.Topic("my-topic") + publish(topic) +} + +// This example illustrates how to declare a common interface for subscriber +// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite +// (cloud.google.com/go/pubsublite/pscompat). +func ExampleNewSubscriberClient_interface() { + // subscriberInterface is implemented by both pscompat.SubscriberClient and + // pubsub.Subscription. + type subscriberInterface interface { + Receive(context.Context, func(context.Context, *pubsub.Message)) error + } + + receive := func(subscriber subscriberInterface) { + // TODO: Receive messages. + } + + // Create a Pub/Sub Lite subscriber client. + ctx := context.Background() + subscriber, err := pscompat.NewSubscriberClient(ctx, "projects/my-project/locations/zone/subscriptions/my-subscription") + if err != nil { + // TODO: Handle error. + } + receive(subscriber) + + // Create a Cloud Pub/Sub subscription to receive. + client, err := pubsub.NewClient(ctx, "my-project") + if err != nil { + // TODO: Handle error. + } + subscription := client.Subscription("my-subscription") + receive(subscription) +} + func ExampleParseMessageMetadata_publisher() { ctx := context.Background() const topic = "projects/my-project/locations/zone/topics/my-topic" diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go index acfb356ca31..dafb25ee526 100644 --- a/pubsublite/pscompat/subscriber.go +++ b/pubsublite/pscompat/subscriber.go @@ -86,11 +86,13 @@ func (f *wireSubscriberFactoryImpl) New(receiver wire.MessageReceiverFunc) (wire return wire.NewSubscriber(context.Background(), f.settings, receiver, f.region, f.subscription.String(), f.options...) } +type messageReceiverFunc = func(context.Context, *pubsub.Message) + // subscriberInstance wraps an instance of a wire.Subscriber. A new instance is // created for each invocation of SubscriberClient.Receive(). type subscriberInstance struct { settings ReceiveSettings - receiver MessageReceiverFunc + receiver messageReceiverFunc recvCtx context.Context // Context passed to the receiver recvCancel context.CancelFunc // Corresponding cancel func for recvCtx wireSub wire.Subscriber @@ -101,7 +103,7 @@ type subscriberInstance struct { err error } -func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver MessageReceiverFunc) (*subscriberInstance, error) { +func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver messageReceiverFunc) (*subscriberInstance, error) { recvCtx, recvCancel := context.WithCancel(ctx) subInstance := &subscriberInstance{ settings: settings, @@ -221,17 +223,6 @@ func (si *subscriberInstance) Wait(ctx context.Context) error { return err } -// MessageReceiverFunc handles messages sent by the Pub/Sub Lite service. -// -// The implementation must arrange for pubsub.Message.Ack() or -// pubsub.Message.Nack() to be called after processing the message. -// -// The receiver func will be called from multiple goroutines if the subscriber -// is connected to multiple partitions. Only one call from any connected -// partition will be outstanding at a time, and blocking in this receiver -// callback will block the delivery of subsequent messages for the partition. -type MessageReceiverFunc func(context.Context, *pubsub.Message) - // SubscriberClient is a Pub/Sub Lite client to receive messages for a given // subscription. // @@ -292,18 +283,20 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s // If there is a fatal service error, Receive returns that error after all of // the outstanding calls to f have returned. If ctx is done, Receive returns nil // after all of the outstanding calls to f have returned and all messages have -// been acknowledged. +// been acknowledged. The context passed to f will be canceled when ctx is Done +// or there is a fatal service error. // // Receive calls f concurrently from multiple goroutines if the SubscriberClient -// is connected to multiple partitions. All messages received by f must be ACKed -// or NACKed. Failure to do so can prevent Receive from returning. +// is connected to multiple partitions. Only one call from any connected +// partition will be outstanding at a time, and blocking in the receiver +// callback f will block the delivery of subsequent messages for the partition. // -// The context passed to f will be canceled when ctx is Done or there is a fatal -// service error. +// All messages received by f must be ACKed or NACKed. Failure to do so can +// prevent Receive from returning. // // Each SubscriberClient may have only one invocation of Receive active at a // time. -func (s *SubscriberClient) Receive(ctx context.Context, f MessageReceiverFunc) error { +func (s *SubscriberClient) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error { if err := s.setReceiveActive(true); err != nil { return err } diff --git a/pubsublite/pscompat/subscriber_test.go b/pubsublite/pscompat/subscriber_test.go index a614a744cda..429c9f55141 100644 --- a/pubsublite/pscompat/subscriber_test.go +++ b/pubsublite/pscompat/subscriber_test.go @@ -121,7 +121,7 @@ func (f *mockWireSubscriberFactory) New(receiver wire.MessageReceiverFunc) (wire }, nil } -func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc) *subscriberInstance { +func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver messageReceiverFunc) *subscriberInstance { sub, _ := newSubscriberInstance(ctx, new(mockWireSubscriberFactory), settings, receiver) return sub }