diff --git a/pubsublite/README.md b/pubsublite/README.md index 8fd9370a3f4..ab37bd95b1e 100644 --- a/pubsublite/README.md +++ b/pubsublite/README.md @@ -24,8 +24,8 @@ To publish messages to a topic: ```go // Create a PublisherClient for topic1 in zone us-central1-b. // See https://cloud.google.com/pubsub/lite/docs/locations for available zones. -const topic = "projects/project-id/locations/us-central1-b/topics/topic1", -publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic) +const topic = "projects/project-id/locations/us-central1-b/topics/topic1" +publisher, err := pscompat.NewPublisherClient(ctx, topic) if err != nil { log.Fatal(err) } @@ -49,7 +49,7 @@ To receive messages for a subscription: ```go // Create a SubscriberClient for subscription1 in zone us-central1-b. const subscription = "projects/project-id/locations/us-central1-b/subscriptions/subscription1" -subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription) +subscriber, err := pscompat.NewSubscriberClient(ctx, subscription) if err != nil { log.Fatal(err) } diff --git a/pubsublite/doc.go b/pubsublite/doc.go index de8f96d7507..4b1def088e5 100644 --- a/pubsublite/doc.go +++ b/pubsublite/doc.go @@ -77,7 +77,7 @@ differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat. To publish messages to a topic, first create a PublisherClient: - publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topicPath) + publisher, err := pscompat.NewPublisherClient(ctx, topicPath) if err != nil { // TODO: Handle error. } @@ -136,7 +136,7 @@ Receiving To receive messages for a subscription, first create a SubscriberClient: - subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscriptionPath) + subscriber, err := pscompat.NewSubscriberClient(ctx, subscriptionPath) Messages are then consumed from a subscription via callback. diff --git a/pubsublite/pscompat/example_test.go b/pubsublite/pscompat/example_test.go index 22040e15196..67a18c7f4d4 100644 --- a/pubsublite/pscompat/example_test.go +++ b/pubsublite/pscompat/example_test.go @@ -16,6 +16,7 @@ package pscompat_test import ( "context" "fmt" + "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/pscompat" @@ -24,8 +25,39 @@ import ( func ExamplePublisherClient_Publish() { ctx := context.Background() const topic = "projects/my-project/locations/zone/topics/my-topic" - // NOTE: DefaultPublishSettings and empty PublishSettings{} are equivalent. - publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic) + publisher, err := pscompat.NewPublisherClient(ctx, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + var results []*pubsub.PublishResult + r := publisher.Publish(ctx, &pubsub.Message{ + Data: []byte("hello world"), + }) + results = append(results, r) + // Do other work ... + for _, r := range results { + id, err := r.Get(ctx) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Published a message with a message ID: %s\n", id) + } +} + +// This example illustrates how to set batching settings for publishing. Note +// that batching settings apply per partition. If BufferedByteLimit is being +// used to bound memory usage, keep in mind the number of partitions in the +// topic. +func ExamplePublisherClient_Publish_batchingSettings() { + ctx := context.Background() + const topic = "projects/my-project/locations/zone/topics/my-topic" + settings := pscompat.DefaultPublishSettings + settings.DelayThreshold = 50 * time.Millisecond + settings.CountThreshold = 200 + settings.BufferedByteLimit = 5e8 + publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topic, settings) if err != nil { // TODO: Handle error. } @@ -49,7 +81,7 @@ func ExamplePublisherClient_Publish() { func ExamplePublisherClient_Error() { ctx := context.Background() const topic = "projects/my-project/locations/zone/topics/my-topic" - publisher, err := pscompat.NewPublisherClient(ctx, pscompat.DefaultPublishSettings, topic) + publisher, err := pscompat.NewPublisherClient(ctx, topic) if err != nil { // TODO: Handle error. } @@ -76,8 +108,7 @@ func ExamplePublisherClient_Error() { func ExampleSubscriberClient_Receive() { ctx := context.Background() const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" - // NOTE: DefaultReceiveSettings and empty ReceiveSettings{} are equivalent. - subscriber, err := pscompat.NewSubscriberClient(ctx, pscompat.DefaultReceiveSettings, subscription) + subscriber, err := pscompat.NewSubscriberClient(ctx, subscription) if err != nil { // TODO: Handle error. } @@ -106,7 +137,7 @@ func ExampleSubscriberClient_Receive_maxOutstanding() { settings := pscompat.DefaultReceiveSettings settings.MaxOutstandingMessages = 5 settings.MaxOutstandingBytes = 10e6 - subscriber, err := pscompat.NewSubscriberClient(ctx, settings, subscription) + subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscription, settings) if err != nil { // TODO: Handle error. } diff --git a/pubsublite/pscompat/integration_test.go b/pubsublite/pscompat/integration_test.go index 423040e1cd5..eefae8a6209 100644 --- a/pubsublite/pscompat/integration_test.go +++ b/pubsublite/pscompat/integration_test.go @@ -81,7 +81,7 @@ func adminClient(ctx context.Context, t *testing.T, region string, opts ...optio func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings, topic wire.TopicPath, opts ...option.ClientOption) *PublisherClient { opts = testOptions(ctx, t, opts...) - pub, err := NewPublisherClient(ctx, settings, topic.String(), opts...) + pub, err := NewPublisherClientWithSettings(ctx, topic.String(), settings, opts...) if err != nil { t.Fatalf("Failed to create publisher client: %v", err) } @@ -90,7 +90,7 @@ func publisherClient(ctx context.Context, t *testing.T, settings PublishSettings func subscriberClient(ctx context.Context, t *testing.T, settings ReceiveSettings, subscription wire.SubscriptionPath, opts ...option.ClientOption) *SubscriberClient { opts = testOptions(ctx, t, opts...) - sub, err := NewSubscriberClient(ctx, settings, subscription.String(), opts...) + sub, err := NewSubscriberClientWithSettings(ctx, subscription.String(), settings, opts...) if err != nil { t.Fatalf("Failed to create publisher client: %v", err) } diff --git a/pubsublite/pscompat/publisher.go b/pubsublite/pscompat/publisher.go index b4e51e6bcb1..4527dff6af9 100644 --- a/pubsublite/pscompat/publisher.go +++ b/pubsublite/pscompat/publisher.go @@ -68,10 +68,17 @@ type PublisherClient struct { err error } -// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to a -// given topic. A valid topic path has the format: -// "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". -func NewPublisherClient(ctx context.Context, settings PublishSettings, topic string, opts ...option.ClientOption) (*PublisherClient, error) { +// NewPublisherClient creates a new Pub/Sub Lite client to publish messages to +// a given topic, using DefaultPublishSettings. A valid topic path has the +// format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func NewPublisherClient(ctx context.Context, topic string, opts ...option.ClientOption) (*PublisherClient, error) { + return NewPublisherClientWithSettings(ctx, topic, DefaultPublishSettings, opts...) +} + +// NewPublisherClientWithSettings creates a new Pub/Sub Lite client to publish +// messages to a given topic, using the specified PublishSettings. A valid topic +// path has the format: "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID". +func NewPublisherClientWithSettings(ctx context.Context, topic string, settings PublishSettings, opts ...option.ClientOption) (*PublisherClient, error) { topicPath, err := wire.ParseTopicPath(topic) if err != nil { return nil, err diff --git a/pubsublite/pscompat/settings.go b/pubsublite/pscompat/settings.go index 3d3032ee27f..175ef3275f7 100644 --- a/pubsublite/pscompat/settings.go +++ b/pubsublite/pscompat/settings.go @@ -63,8 +63,8 @@ type PublishSettings struct { // DefaultPublishSettings.ByteThreshold. Otherwise must be > 0. ByteThreshold int - // The maximum time that the client will attempt to establish a publish stream - // connection to the server. If Timeout is 0, it will be treated as + // The maximum time that the client will attempt to open a publish stream + // to the server. If Timeout is 0, it will be treated as // DefaultPublishSettings.Timeout. Otherwise must be > 0. // // The timeout is exceeded, the publisher will terminate with the last error @@ -170,8 +170,8 @@ type ReceiveSettings struct { // the associated topic. MaxOutstandingBytes int - // The maximum time that the client will attempt to establish a subscribe - // stream connection to the server. If Timeout is 0, it will be treated as + // The maximum time that the client will attempt to open a subscribe stream + // to the server. If Timeout is 0, it will be treated as // DefaultReceiveSettings.Timeout. Otherwise must be > 0. // // The timeout is exceeded, the SubscriberClient will terminate with the last diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go index 760c2d9ed38..80f6100be4a 100644 --- a/pubsublite/pscompat/subscriber.go +++ b/pubsublite/pscompat/subscriber.go @@ -234,9 +234,18 @@ type SubscriberClient struct { } // NewSubscriberClient creates a new Pub/Sub Lite client to receive messages for -// a given subscription. A valid subscription path has the format: +// a given subscription, using DefaultReceiveSettings. A valid subscription path +// has the format: // "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". -func NewSubscriberClient(ctx context.Context, settings ReceiveSettings, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) { +func NewSubscriberClient(ctx context.Context, subscription string, opts ...option.ClientOption) (*SubscriberClient, error) { + return NewSubscriberClientWithSettings(ctx, subscription, DefaultReceiveSettings, opts...) +} + +// NewSubscriberClientWithSettings creates a new Pub/Sub Lite client to receive +// messages for a given subscription, using the specified ReceiveSettings. A +// valid subscription path has the format: +// "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID". +func NewSubscriberClientWithSettings(ctx context.Context, subscription string, settings ReceiveSettings, opts ...option.ClientOption) (*SubscriberClient, error) { subscriptionPath, err := wire.ParseSubscriptionPath(subscription) if err != nil { return nil, err