Skip to content

Commit

Permalink
fix(pubsublite): make SubscriberClient.Receive identical to pubsub (#…
Browse files Browse the repository at this point in the history
…4281)

- Ensures that pscompat.SubscriberClient.Receive and pubsub.Subscription.Receive have identical interfaces.
- Adds examples for how to declare common interfaces for pubsublite and pubsub.
  • Loading branch information
tmdiep committed Jun 22, 2021
1 parent 634847b commit 5b5d0f7
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 38 deletions.
36 changes: 21 additions & 15 deletions pubsublite/doc.go
Expand Up @@ -29,14 +29,30 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Note: This library is in BETA. Backwards-incompatible changes may be made before
stable v1.0.0 is released.
Introduction
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Complete sample programs can be found at
https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
The cloud.google.com/go/pubsublite/pscompat subpackage contains clients for
publishing and receiving messages, which have similar interfaces to their
pubsub.Topic and pubsub.Subscription counterparts in cloud.google.com/go/pubsub.
The following examples demonstrate how to declare common interfaces:
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The following imports are required for code snippets below:
Expand All @@ -46,11 +62,6 @@ The following imports are required for code snippets below:
"cloud.google.com/go/pubsublite/pscompat"
)
More complete examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Creating Topics
Expand Down Expand Up @@ -83,11 +94,6 @@ where Pub/Sub Lite is available.
Publishing
The pubsublite/pscompat subpackage contains clients for publishing and receiving
messages, which have similar interfaces to their pubsub.Topic and
pubsub.Subscription counterparts in the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub.
Pub/Sub Lite uses gRPC streams extensively for high throughput. For more
differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.
Expand Down Expand Up @@ -118,7 +124,7 @@ service:
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
after it has been stopped or has terminated due to a permanent service error.
after it has been stopped or has terminated due to a permanent error.
publisher.Stop()
Expand Down Expand Up @@ -167,8 +173,8 @@ subscriber client is connected to).
// TODO: Handle error.
}
Receive blocks until either the context is canceled or a fatal service error
occurs. To terminate a call to Receive, cancel its context:
Receive blocks until either the context is canceled or a permanent error occurs.
To terminate a call to Receive, cancel its context:
cancel()
Expand Down
3 changes: 1 addition & 2 deletions pubsublite/internal/wire/README.md
@@ -1,7 +1,6 @@
# Wire

This directory contains internal implementation details for Cloud Pub/Sub Lite.
Its exported interface can change at any time.
This directory contains internal implementation details for Pub/Sub Lite.

## Conventions

Expand Down
8 changes: 7 additions & 1 deletion pubsublite/pscompat/doc.go
Expand Up @@ -19,7 +19,10 @@ This package is designed to compatible with the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.
respectively, from the pubsub package. See the following examples:
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The Cloud Pub/Sub and Pub/Sub Lite services have some differences:
- Pub/Sub Lite does not support NACK for messages. By default, this will
Expand All @@ -42,6 +45,9 @@ https://cloud.google.com/pubsub/lite.
Information about choosing between Cloud Pub/Sub vs Pub/Sub Lite is available at
https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
Complete sample programs can be found at
https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
*/
Expand Down
64 changes: 64 additions & 0 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -292,6 +292,70 @@ func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
cancel()
}

// This example illustrates how to declare a common interface for publisher
// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
// (cloud.google.com/go/pubsublite/pscompat).
func ExampleNewPublisherClient_interface() {
// publisherInterface is implemented by both pscompat.PublisherClient and
// pubsub.Topic.
type publisherInterface interface {
Publish(context.Context, *pubsub.Message) *pubsub.PublishResult
Stop()
}

publish := func(publisher publisherInterface) {
defer publisher.Stop()
// TODO: Publish messages.
}

// Create a Pub/Sub Lite publisher client.
ctx := context.Background()
publisher, err := pscompat.NewPublisherClient(ctx, "projects/my-project/locations/zone/topics/my-topic")
if err != nil {
// TODO: Handle error.
}
publish(publisher)

// Create a Cloud Pub/Sub topic to publish.
client, err := pubsub.NewClient(ctx, "my-project")
if err != nil {
// TODO: Handle error.
}
topic := client.Topic("my-topic")
publish(topic)
}

// This example illustrates how to declare a common interface for subscriber
// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
// (cloud.google.com/go/pubsublite/pscompat).
func ExampleNewSubscriberClient_interface() {
// subscriberInterface is implemented by both pscompat.SubscriberClient and
// pubsub.Subscription.
type subscriberInterface interface {
Receive(context.Context, func(context.Context, *pubsub.Message)) error
}

receive := func(subscriber subscriberInterface) {
// TODO: Receive messages.
}

// Create a Pub/Sub Lite subscriber client.
ctx := context.Background()
subscriber, err := pscompat.NewSubscriberClient(ctx, "projects/my-project/locations/zone/subscriptions/my-subscription")
if err != nil {
// TODO: Handle error.
}
receive(subscriber)

// Create a Cloud Pub/Sub subscription to receive.
client, err := pubsub.NewClient(ctx, "my-project")
if err != nil {
// TODO: Handle error.
}
subscription := client.Subscription("my-subscription")
receive(subscription)
}

func ExampleParseMessageMetadata_publisher() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
Expand Down
31 changes: 12 additions & 19 deletions pubsublite/pscompat/subscriber.go
Expand Up @@ -86,11 +86,13 @@ func (f *wireSubscriberFactoryImpl) New(receiver wire.MessageReceiverFunc) (wire
return wire.NewSubscriber(context.Background(), f.settings, receiver, f.region, f.subscription.String(), f.options...)
}

type messageReceiverFunc = func(context.Context, *pubsub.Message)

// subscriberInstance wraps an instance of a wire.Subscriber. A new instance is
// created for each invocation of SubscriberClient.Receive().
type subscriberInstance struct {
settings ReceiveSettings
receiver MessageReceiverFunc
receiver messageReceiverFunc
recvCtx context.Context // Context passed to the receiver
recvCancel context.CancelFunc // Corresponding cancel func for recvCtx
wireSub wire.Subscriber
Expand All @@ -101,7 +103,7 @@ type subscriberInstance struct {
err error
}

func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver MessageReceiverFunc) (*subscriberInstance, error) {
func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver messageReceiverFunc) (*subscriberInstance, error) {
recvCtx, recvCancel := context.WithCancel(ctx)
subInstance := &subscriberInstance{
settings: settings,
Expand Down Expand Up @@ -221,17 +223,6 @@ func (si *subscriberInstance) Wait(ctx context.Context) error {
return err
}

// MessageReceiverFunc handles messages sent by the Pub/Sub Lite service.
//
// The implementation must arrange for pubsub.Message.Ack() or
// pubsub.Message.Nack() to be called after processing the message.
//
// The receiver func will be called from multiple goroutines if the subscriber
// is connected to multiple partitions. Only one call from any connected
// partition will be outstanding at a time, and blocking in this receiver
// callback will block the delivery of subsequent messages for the partition.
type MessageReceiverFunc func(context.Context, *pubsub.Message)

// SubscriberClient is a Pub/Sub Lite client to receive messages for a given
// subscription.
//
Expand Down Expand Up @@ -292,18 +283,20 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s
// If there is a fatal service error, Receive returns that error after all of
// the outstanding calls to f have returned. If ctx is done, Receive returns nil
// after all of the outstanding calls to f have returned and all messages have
// been acknowledged.
// been acknowledged. The context passed to f will be canceled when ctx is Done
// or there is a fatal service error.
//
// Receive calls f concurrently from multiple goroutines if the SubscriberClient
// is connected to multiple partitions. All messages received by f must be ACKed
// or NACKed. Failure to do so can prevent Receive from returning.
// is connected to multiple partitions. Only one call from any connected
// partition will be outstanding at a time, and blocking in the receiver
// callback f will block the delivery of subsequent messages for the partition.
//
// The context passed to f will be canceled when ctx is Done or there is a fatal
// service error.
// All messages received by f must be ACKed or NACKed. Failure to do so can
// prevent Receive from returning.
//
// Each SubscriberClient may have only one invocation of Receive active at a
// time.
func (s *SubscriberClient) Receive(ctx context.Context, f MessageReceiverFunc) error {
func (s *SubscriberClient) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error {
if err := s.setReceiveActive(true); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/pscompat/subscriber_test.go
Expand Up @@ -121,7 +121,7 @@ func (f *mockWireSubscriberFactory) New(receiver wire.MessageReceiverFunc) (wire
}, nil
}

func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc) *subscriberInstance {
func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver messageReceiverFunc) *subscriberInstance {
sub, _ := newSubscriberInstance(ctx, new(mockWireSubscriberFactory), settings, receiver)
return sub
}
Expand Down

0 comments on commit 5b5d0f7

Please sign in to comment.