From ade7981d6f650e3c359305d9b06e819f02258dc0 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 23 Jun 2021 08:33:19 -0700 Subject: [PATCH] samples: add publish with flow control sample (#717) * samples: add publish with flow control sample * address kamal's comments --- .../pubsub/PublishWithFlowControlExample.java | 94 +++++++++++++++++++ .../src/test/java/pubsub/PublisherIT.java | 5 + 2 files changed, 99 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/PublishWithFlowControlExample.java diff --git a/samples/snippets/src/main/java/pubsub/PublishWithFlowControlExample.java b/samples/snippets/src/main/java/pubsub/PublishWithFlowControlExample.java new file mode 100644 index 000000000..b1d4a8ef8 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/PublishWithFlowControlExample.java @@ -0,0 +1,94 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_publisher_flow_control] + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class PublishWithFlowControlExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "your-project-id"; + String topicId = "your-topic-id"; + + publishWithFlowControlExample(projectId, topicId); + } + + public static void publishWithFlowControlExample(String projectId, String topicId) + throws IOException, ExecutionException, InterruptedException { + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = null; + List> messageIdFutures = new ArrayList<>(); + + try { + // Configure how many messages the publisher client can hold in memory + // and what to do when messages exceed the limit. + FlowControlSettings flowControlSettings = + FlowControlSettings.newBuilder() + // Block more messages from being published when the limit is reached. The other + // options are Ignore (or continue publishing) and ThrowException (or error out). + .setLimitExceededBehavior(LimitExceededBehavior.Block) + .setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB + .setMaxOutstandingElementCount(100L) // 100 messages + .build(); + + // By default, messages are not batched. + BatchingSettings batchingSettings = + BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build(); + + publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build(); + + // Publish 1000 messages in quick succession may be constrained by publisher flow control. + for (int i = 0; i < 1000; i++) { + String message = "message " + i; + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + // Once published, returns a server-assigned message id (unique within the topic) + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + messageIdFutures.add(messageIdFuture); + } + } finally { + // Wait on any pending publish requests. + List messageIds = ApiFutures.allAsList(messageIdFutures).get(); + + System.out.println( + "Published " + messageIds.size() + " messages with flow control settings."); + + if (publisher != null) { + // When finished with the publisher, shut down to free up resources. + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + } +} +// [END pubsub_publisher_flow_control] diff --git a/samples/snippets/src/test/java/pubsub/PublisherIT.java b/samples/snippets/src/test/java/pubsub/PublisherIT.java index 0ed25c082..b22912d47 100644 --- a/samples/snippets/src/test/java/pubsub/PublisherIT.java +++ b/samples/snippets/src/test/java/pubsub/PublisherIT.java @@ -96,6 +96,11 @@ public void testPublisher() throws Exception { PublishWithBatchSettingsExample.publishWithBatchSettingsExample(projectId, topicId); assertThat(bout.toString()).contains("Published 100 messages with batch settings."); + bout.reset(); + // Test publish with flow control settings. + PublishWithFlowControlExample.publishWithFlowControlExample(projectId, topicId); + assertThat(bout.toString()).contains("Published 1000 messages with flow control settings."); + bout.reset(); // Test publish with concurrency control. PublishWithConcurrencyControlExample.publishWithConcurrencyControlExample(projectId, topicId);