Skip to content

Commit

Permalink
docs(pubsublite): extra examples and documentation clean up (#3663)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Feb 9, 2021
1 parent 28decb5 commit eebb48a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pubsublite/README.md
Expand Up @@ -27,7 +27,7 @@ To publish messages to a topic:
const topic = "projects/project-id/locations/us-central1-b/topics/topic1"
publisher, err := pscompat.NewPublisherClient(ctx, topic)
if err != nil {
log.Fatal(err)
log.Fatal(err)
}

// Publish "hello world".
Expand Down
45 changes: 31 additions & 14 deletions pubsublite/doc.go
Expand Up @@ -29,17 +29,34 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.
Introduction
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Note: This library is in ALPHA. Backwards-incompatible changes may be made
before stable v1.0.0 is released.
The following imports are required for code snippets below:
import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/pscompat"
)
More complete examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Creating Topics
Messages are published to topics. Pub/Sub Lite topics may be created like so:
ctx := context.Background()
const topicPath = "projects/my-project/locations/us-central1-c/topics/my-topic"
topicConfig := pubsublite.TopicConfig{
Name: topicPath,
Expand All @@ -53,16 +70,15 @@ Messages are published to topics. Pub/Sub Lite topics may be created like so:
if err != nil {
// TODO: Handle error.
}
topic, err = adminClient.CreateTopic(ctx, topicConfig)
if err != nil {
if _, err = adminClient.CreateTopic(ctx, topicConfig); err != nil {
// TODO: Handle error.
}
See https://cloud.google.com/pubsub/lite/docs/topics for more information about
how Pub/Sub Lite topics are configured.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of regions
and zones where Pub/Sub Lite is available.
See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
where Pub/Sub Lite is available.
Publishing
Expand Down Expand Up @@ -100,8 +116,9 @@ service:
// TODO: Handle error.
}
Once you've finishing publishing, call Stop to flush all messages to the service
and close gRPC streams:
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
after it has been stopped or has terminated due to a permanent service error.
publisher.Stop()
Expand All @@ -123,8 +140,7 @@ Pub/Sub Lite subscriptions may be created like so:
Topic: topicPath,
DeliveryRequirement: pubsublite.DeliverImmediately,
}
subscription, err = adminClient.CreateSubscription(ctx, subscriptionConfig)
if err != nil {
if _, err = adminClient.CreateSubscription(ctx, subscriptionConfig); err != nil {
// TODO: Handle error.
}
Expand All @@ -138,7 +154,9 @@ To receive messages for a subscription, first create a SubscriberClient:
subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath)
Messages are then consumed from a subscription via callback.
Messages are then consumed from a subscription via callback. The callback may be
invoked concurrently by multiple goroutines (one per partition that the
subscriber client is connected to).
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
Expand All @@ -149,9 +167,8 @@ Messages are then consumed from a subscription via callback.
// TODO: Handle error.
}
The callback may be invoked concurrently by multiple goroutines (one per
partition that the subscriber client is connected to). To terminate a call to
Receive, cancel its context:
Receive blocks until either the context is canceled or a fatal service error
occurs. To terminate a call to Receive, cancel its context:
cancel()
Expand Down
17 changes: 17 additions & 0 deletions pubsublite/example_test.go
Expand Up @@ -22,8 +22,14 @@ import (
"google.golang.org/api/iterator"
)

// This example demonstrates how to create a new topic.
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
// about how Pub/Sub Lite topics are configured.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of zones
// where Pub/Sub Lite is available.
func ExampleAdminClient_CreateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -47,6 +53,7 @@ func ExampleAdminClient_CreateTopic() {

func ExampleAdminClient_UpdateTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -67,6 +74,7 @@ func ExampleAdminClient_UpdateTopic() {

func ExampleAdminClient_DeleteTopic() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -80,6 +88,7 @@ func ExampleAdminClient_DeleteTopic() {

func ExampleAdminClient_Topics() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -101,6 +110,7 @@ func ExampleAdminClient_Topics() {

func ExampleAdminClient_TopicSubscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -121,8 +131,12 @@ func ExampleAdminClient_TopicSubscriptions() {
}
}

// This example demonstrates how to create a new subscription for a topic.
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more
// information about how subscriptions are configured.
func ExampleAdminClient_CreateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the topic and subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -143,6 +157,7 @@ func ExampleAdminClient_CreateSubscription() {

func ExampleAdminClient_UpdateSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -162,6 +177,7 @@ func ExampleAdminClient_UpdateSubscription() {

func ExampleAdminClient_DeleteSubscription() {
ctx := context.Background()
// NOTE: region must correspond to the zone of the subscription.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand All @@ -175,6 +191,7 @@ func ExampleAdminClient_DeleteSubscription() {

func ExampleAdminClient_Subscriptions() {
ctx := context.Background()
// NOTE: region must correspond to the zone below.
admin, err := pubsublite.NewAdminClient(ctx, "region")
if err != nil {
// TODO: Handle error.
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/pscompat/doc.go
Expand Up @@ -15,9 +15,9 @@
Package pscompat contains clients for publishing and subscribing using the
Pub/Sub Lite service.
The clients in this package are designed to compatible with the Cloud Pub/Sub
library: https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are
defined by the client, PublisherClient and SubscriberClient can be used as
This package is designed to compatible with the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.
Expand Down
40 changes: 36 additions & 4 deletions pubsublite/pscompat/example_test.go
Expand Up @@ -36,7 +36,7 @@ func ExamplePublisherClient_Publish() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
Expand Down Expand Up @@ -68,7 +68,7 @@ func ExamplePublisherClient_Publish_batchingSettings() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
Expand All @@ -92,13 +92,15 @@ func ExamplePublisherClient_Error() {
Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
// Publish more messages ...
for _, r := range results {
id, err := r.Get(ctx)
if err != nil {
// TODO: Handle error.
if err == pscompat.ErrPublisherStopped {
// Prints the fatal error that caused the publisher to terminate.
fmt.Printf("Publisher client stopped due to error: %v\n", publisher.Error())
break
}
}
fmt.Printf("Published a message with a message ID: %s\n", id)
Expand Down Expand Up @@ -151,6 +153,36 @@ func ExampleSubscriberClient_Receive_maxOutstanding() {
// TODO: Handle error.
}

// Call cancel from callback, or another goroutine.
// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

// This example shows how to manually assign which topic partitions a
// SubscriberClient should connect to. If not specified, the SubscriberClient
// will use Pub/Sub Lite's partition assignment service to automatically
// determine which partitions it should connect to.
func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
ctx := context.Background()
const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription"
settings := pscompat.DefaultReceiveSettings
// NOTE: The corresponding topic must have 2 or more partitions.
settings.Partitions = []int{0, 1}
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings)
if err != nil {
// TODO: Handle error.
}
cctx, cancel := context.WithCancel(ctx)
err = subscriber.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
// TODO: Handle message.
// NOTE: May be called concurrently; synchronize access to shared memory.
m.Ack()
})
if err != nil {
// TODO: Handle error.
}

// Call cancel from the receiver callback or another goroutine to stop
// receiving.
cancel()
}

0 comments on commit eebb48a

Please sign in to comment.