From 19dd12903757fa16075f8dab4ed5a2fa0bbe5299 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 25 Nov 2019 10:04:56 -0500 Subject: [PATCH 01/14] Modifying Publish example in README to match other examples given, and fix issue #6784 --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b928096b..ca3e1c833 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,10 @@ With Pub/Sub you can publish messages to a topic. Add the following import at th ```java import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; ``` @@ -135,6 +138,16 @@ try { ByteString data = ByteString.copyFromUtf8("my-message"); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { + public void onSuccess(String messageId) { + System.out.println("published with message id: " + messageId); + } + + public void onFailure(Throwable t) { + System.out.println("failed to publish: " + t); + } + }, MoreExecutors.directExecutor()); + //... } finally { if (publisher != null) { publisher.shutdown(); @@ -284,4 +297,4 @@ Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5] [troubleshooting]: https://github.com/googleapis/google-cloud-common/blob/master/troubleshooting/readme.md#troubleshooting [contributing]: https://github.com/googleapis/java-pubsub/blob/master/CONTRIBUTING.md [code-of-conduct]: https://github.com/googleapis/java-pubsub/blob/master/CODE_OF_CONDUCT.md#contributor-code-of-conduct -[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE \ No newline at end of file +[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE From 41585299b46e4a471252c124bb412a92d371ce7a Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 25 Nov 2019 10:04:56 -0500 Subject: [PATCH 02/14] fix: Modifying Publish example in README to match other examples, and fix Issue #11 --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 3b928096b..ca3e1c833 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,10 @@ With Pub/Sub you can publish messages to a topic. Add the following import at th ```java import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; ``` @@ -135,6 +138,16 @@ try { ByteString data = ByteString.copyFromUtf8("my-message"); PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() { + public void onSuccess(String messageId) { + System.out.println("published with message id: " + messageId); + } + + public void onFailure(Throwable t) { + System.out.println("failed to publish: " + t); + } + }, MoreExecutors.directExecutor()); + //... } finally { if (publisher != null) { publisher.shutdown(); @@ -284,4 +297,4 @@ Java 11 | [![Kokoro CI][kokoro-badge-image-5]][kokoro-badge-link-5] [troubleshooting]: https://github.com/googleapis/google-cloud-common/blob/master/troubleshooting/readme.md#troubleshooting [contributing]: https://github.com/googleapis/java-pubsub/blob/master/CONTRIBUTING.md [code-of-conduct]: https://github.com/googleapis/java-pubsub/blob/master/CODE_OF_CONDUCT.md#contributor-code-of-conduct -[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE \ No newline at end of file +[license]: https://github.com/googleapis/java-pubsub/blob/master/LICENSE From 01d41b726a0c5655b079d641594ef7dd9c279672 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 13 Jan 2020 12:40:07 -0500 Subject: [PATCH 03/14] feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. --- .../com/google/cloud/pubsub/v1/MessageDispatcher.java | 9 ++++++++- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 8 ++++++++ .../google/cloud/pubsub/v1/MessageDispatcherTest.java | 8 +++++++- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 9 +++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 53b979d5e..34b482cd8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -341,10 +341,17 @@ private void processBatch(List batch) { // This should be a blocking flow controller and never throw an exception. throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler); + processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler); } } + private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { + return PubsubMessage.newBuilder(receivedMessage.getMessage()) + .putAttributes( + "googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt())) + .build(); + } + private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { final SettableApiFuture response = SettableApiFuture.create(); final AckReplyConsumer consumer = diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 059ac0ce4..d08ae921b 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -44,6 +44,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -205,6 +206,13 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + public static int getDeliveryAttempt(PubsubMessage message) { + if (!message.containsAttributes("googclient_deliveryattempt")) { + throw new RuntimeException("Message does not contain delivery attempt information"); + } + return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); + } + /** Subscription which the subscriber is subscribed to. */ public String getSubscriptionNameString() { return subscriptionName; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 163475407..b84e82c8c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -37,10 +37,13 @@ import org.threeten.bp.Duration; public class MessageDispatcherTest { + private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); + private static final int DELIVERY_INFO_COUNT = 3; private static final ReceivedMessage TEST_MESSAGE = ReceivedMessage.newBuilder() .setAckId("ackid") - .setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build()) + .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) .build(); private static final Runnable NOOP_RUNNABLE = new Runnable() { @@ -78,6 +81,9 @@ public void setUp() { new MessageReceiver() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + assertThat(message.getData()).isEqualTo(MESSAGE_DATA); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); consumers.add(consumer); } }; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 5512145e1..4f0a9600d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -84,6 +84,15 @@ public void tearDown() throws Exception { testChannel.shutdown(); } + @Test + public void testDeliveryAttemptHelper() { + int deliveryAttempt = 3; + PubsubMessage message = PubsubMessage.newBuilder() + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); + assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + } + @Test public void testOpenedChannels() throws Exception { int expectedChannelCount = 1; From ed3b1eeefcc5b5eaa941e6138ab9c9850553577b Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 13 Jan 2020 12:53:49 -0500 Subject: [PATCH 04/14] Fix formatting --- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 4f0a9600d..11a67c0d3 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -87,9 +87,10 @@ public void tearDown() throws Exception { @Test public void testDeliveryAttemptHelper() { int deliveryAttempt = 3; - PubsubMessage message = PubsubMessage.newBuilder() - .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) - .build(); + PubsubMessage message = + PubsubMessage.newBuilder() + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); } From 72f7996fd7c0fd9f5e786c151ca2168b8eefb520 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 14 Jan 2020 09:27:51 -0500 Subject: [PATCH 05/14] fix: making changes requested in pull request --- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 6 ++---- .../java/com/google/cloud/pubsub/v1/SubscriberTest.java | 3 +++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index d08ae921b..422a0577f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -206,11 +206,9 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + /** Returns the delivery attempt count for a received {@link PubsubMessage} */ public static int getDeliveryAttempt(PubsubMessage message) { - if (!message.containsAttributes("googclient_deliveryattempt")) { - throw new RuntimeException("Message does not contain delivery attempt information"); - } - return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); + return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0")); } /** Subscription which the subscriber is subscribed to. */ diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 11a67c0d3..8f8489f21 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -92,6 +92,9 @@ public void testDeliveryAttemptHelper() { .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + + PubsubMessage emptyMessage = PubsubMessage.newBuilder().build(); + assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0); } @Test From 3738ec8c1cf3bb3b60c65a533927e2db5c3e8699 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 29 Jan 2020 13:24:23 -0500 Subject: [PATCH 06/14] fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled --- .../google/cloud/pubsub/v1/MessageDispatcher.java | 14 ++++++++++---- .../com/google/cloud/pubsub/v1/Subscriber.java | 7 +++++-- .../com/google/cloud/pubsub/v1/SubscriberTest.java | 5 +++-- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 34b482cd8..55b8dd776 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -346,10 +346,16 @@ private void processBatch(List batch) { } private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { - return PubsubMessage.newBuilder(receivedMessage.getMessage()) - .putAttributes( - "googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt())) - .build(); + PubsubMessage originalMessage = receivedMessage.getMessage(); + int deliveryAttempt = receivedMessage.getDeliveryAttempt(); + // Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In + // this case, do not populate the PubsubMessage with the delivery attempt attribute. + if (deliveryAttempt > 0) { + return PubsubMessage.newBuilder(originalMessage) + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); + } + return originalMessage; } private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 422a0577f..2deafb2e7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -207,8 +207,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) } /** Returns the delivery attempt count for a received {@link PubsubMessage} */ - public static int getDeliveryAttempt(PubsubMessage message) { - return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0")); + public static Integer getDeliveryAttempt(PubsubMessage message) { + if (!message.containsAttributes("googclient_deliveryattempt")) { + return null; + } + return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); } /** Subscription which the subscriber is subscribed to. */ diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 8f8489f21..95ad58d80 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -86,15 +86,16 @@ public void tearDown() throws Exception { @Test public void testDeliveryAttemptHelper() { - int deliveryAttempt = 3; + Integer deliveryAttempt = 3; PubsubMessage message = PubsubMessage.newBuilder() .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + // In the case where delivery attempt attribute is not populated, expect null PubsubMessage emptyMessage = PubsubMessage.newBuilder().build(); - assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0); + assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), null); } @Test From aecd4ca5c6941da7571249c4708f5798ac6db04c Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Thu, 30 Jan 2020 11:05:30 -0500 Subject: [PATCH 07/14] Adding unit test for case in which a received message has no delivery attempt --- .../pubsub/v1/MessageDispatcherTest.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index b84e82c8c..1745b370b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -17,6 +17,8 @@ package com.google.cloud.pubsub.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; @@ -59,6 +61,7 @@ public void run() { private List sentModAcks; private FakeClock clock; private FlowController flowController; + private boolean messageContainsDeliveryAttempt; @AutoValue abstract static class ModAckItem { @@ -82,8 +85,13 @@ public void setUp() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { assertThat(message.getData()).isEqualTo(MESSAGE_DATA); - assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) - .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); + if (messageContainsDeliveryAttempt) { + assertTrue(message.containsAttributes("googclient_deliveryattempt")); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); + } else { + assertFalse(message.containsAttributes("googclient_deliveryattempt")); + } consumers.add(consumer); } }; @@ -126,6 +134,8 @@ public void sendAckOperations( systemExecutor, clock); dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + + messageContainsDeliveryAttempt = true; } @Test @@ -136,6 +146,22 @@ public void testReceipt() { .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); } + @Test + public void testReceiptNoDeliveryAttempt() { + messageContainsDeliveryAttempt = false; + ReceivedMessage messageNoDeliveryAttempt = + ReceivedMessage.newBuilder() + .setAckId("ackid") + .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) + .build(); + dispatcher.processReceivedMessages(Collections.singletonList(messageNoDeliveryAttempt)); + dispatcher.processOutstandingAckOperations(); + assertThat(sentModAcks) + .contains( + ModAckItem.of( + messageNoDeliveryAttempt.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); + } + @Test public void testAck() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); From 77fa3b35d7a0bf85cd348b551e6b784ac97ef67e Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Thu, 30 Jan 2020 17:03:31 -0500 Subject: [PATCH 08/14] Making MessageWaiter class more generic to also be used for outstanding ack operations --- .../cloud/pubsub/v1/MessageDispatcher.java | 12 ++++----- .../com/google/cloud/pubsub/v1/Publisher.java | 10 ++++---- .../v1/StreamingSubscriberConnection.java | 9 ++++++- .../google/cloud/pubsub/v1/Subscriber.java | 1 + .../v1/{MessageWaiter.java => Waiter.java} | 25 ++++++++++--------- ...MessageWaiterTest.java => WaiterTest.java} | 14 +++++------ 6 files changed, 40 insertions(+), 31 deletions(-) rename google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/{MessageWaiter.java => Waiter.java} (68%) rename google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/{MessageWaiterTest.java => WaiterTest.java} (80%) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 56fa8a3a9..c3f1a46cd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -72,7 +72,7 @@ class MessageDispatcher { private final AckProcessor ackProcessor; private final FlowController flowController; - private final MessageWaiter messagesWaiter; + private final Waiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); @@ -145,7 +145,7 @@ private void forget() { return; } flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); + messagesWaiter.incrementPendingCount(-1); } @Override @@ -205,7 +205,7 @@ void sendAckOperations( // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS this.ackLatencyDistribution = ackLatencyDistribution; jobLock = new ReentrantLock(); - messagesWaiter = new MessageWaiter(); + messagesWaiter = new Waiter(); this.clock = clock; this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); } @@ -268,7 +268,7 @@ public void run() { } void stop() { - messagesWaiter.waitNoMessages(); + messagesWaiter.waitComplete(); jobLock.lock(); try { if (backgroundJob != null) { @@ -331,9 +331,9 @@ void processReceivedMessages(List messages) { } private void processBatch(List batch) { - messagesWaiter.incrementPendingMessages(batch.size()); + messagesWaiter.incrementPendingCount(batch.size()); for (OutstandingMessage message : batch) { - // This is a blocking flow controller. We have already incremented MessageWaiter, so + // This is a blocking flow controller. We have already incremented messagesWaiter, so // shutdown will block on processing of all these messages anyway. try { flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 093590bb9..9af734db2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -104,7 +104,7 @@ public class Publisher { private final AtomicBoolean shutdown; private final BackgroundResource backgroundResources; - private final MessageWaiter messagesWaiter; + private final Waiter messagesWaiter; private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; @@ -173,7 +173,7 @@ private Publisher(Builder builder) throws IOException { backgroundResourceList.add(publisherStub); backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); - messagesWaiter = new MessageWaiter(); + messagesWaiter = new Waiter(); } /** Topic which the publisher publishes to. */ @@ -249,7 +249,7 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.unlock(); } - messagesWaiter.incrementPendingMessages(1); + messagesWaiter.incrementPendingCount(1); // For messages without ordering keys, it is okay to send batches without holding // messagesBatchLock. @@ -423,7 +423,7 @@ public void onSuccess(PublishResponse result) { } } } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + messagesWaiter.incrementPendingCount(-outstandingBatch.size()); } } @@ -432,7 +432,7 @@ public void onFailure(Throwable t) { try { outstandingBatch.onFailure(t); } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + messagesWaiter.incrementPendingCount(-outstandingBatch.size()); } } }; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index ef3ce23d6..3909b039c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -72,6 +72,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); + private final Waiter ackOperationsWaiter = new Waiter(); private final Lock lock = new ReentrantLock(); private ClientStream clientStream; @@ -273,16 +274,18 @@ public void sendAckOperations( new ApiFutureCallback() { @Override public void onSuccess(Empty empty) { - // noop + ackOperationsWaiter.incrementPendingCount(-1); } @Override public void onFailure(Throwable t) { + ackOperationsWaiter.incrementPendingCount(-1); Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); } }; + int pendingOperations = 0; for (PendingModifyAckDeadline modack : ackDeadlineExtensions) { for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) { ApiFuture future = @@ -294,6 +297,7 @@ public void onFailure(Throwable t) { .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) .build()); ApiFutures.addCallback(future, loggingCallback, directExecutor()); + pendingOperations++; } } @@ -306,6 +310,9 @@ public void onFailure(Throwable t) { .addAllAckIds(idChunk) .build()); ApiFutures.addCallback(future, loggingCallback, directExecutor()); + pendingOperations++; } + + ackOperationsWaiter.incrementPendingCount(pendingOperations); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 2deafb2e7..4f9f66bff 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -305,6 +305,7 @@ public void run() { // stop connection is no-op if connections haven't been started. stopAllStreamingConnections(); shutdownBackgroundResources(); + subStub.shutdown(); notifyStopped(); } catch (Exception e) { notifyFailed(e); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java similarity index 68% rename from google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java rename to google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java index cb238f3d3..f540fdf1c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java @@ -18,25 +18,26 @@ import com.google.api.core.InternalApi; -/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */ -class MessageWaiter { - private int pendingMessages; +/** A barrier kind of object that helps keep track of pending actions and + * synchronously wait until all have completed. */ +class Waiter { + private int pendingCount; - MessageWaiter() { - pendingMessages = 0; + Waiter() { + pendingCount = 0; } - public synchronized void incrementPendingMessages(int messages) { - this.pendingMessages += messages; - if (pendingMessages == 0) { + public synchronized void incrementPendingCount(int delta) { + this.pendingCount += delta; + if (pendingCount == 0) { notifyAll(); } } - public synchronized void waitNoMessages() { + public synchronized void waitComplete() { boolean interrupted = false; try { - while (pendingMessages > 0) { + while (pendingCount > 0) { try { wait(); } catch (InterruptedException e) { @@ -52,7 +53,7 @@ public synchronized void waitNoMessages() { } @InternalApi - public int pendingMessages() { - return pendingMessages; + public int pendingCount() { + return pendingCount; } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java similarity index 80% rename from google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java rename to google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java index 5f8e19875..ca8618378 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java @@ -22,14 +22,14 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link MessageWaiter}. */ +/** Tests for {@link Waiter}. */ @RunWith(JUnit4.class) -public class MessageWaiterTest { +public class WaiterTest { @Test public void test() throws Exception { - final MessageWaiter waiter = new MessageWaiter(); - waiter.incrementPendingMessages(1); + final Waiter waiter = new Waiter(); + waiter.incrementPendingCount(1); final Thread mainThread = Thread.currentThread(); Thread t = @@ -40,14 +40,14 @@ public void run() { while (mainThread.getState() != Thread.State.WAITING) { Thread.yield(); } - waiter.incrementPendingMessages(-1); + waiter.incrementPendingCount(-1); } }); t.start(); - waiter.waitNoMessages(); + waiter.waitComplete(); t.join(); - assertEquals(0, waiter.pendingMessages()); + assertEquals(0, waiter.pendingCount()); } } From da0e355d71acc3de3c40c29b8528e955d2dac19a Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Thu, 30 Jan 2020 17:22:28 -0500 Subject: [PATCH 09/14] Waiting for acks to complete before shutting down a streaming subscriber connection --- .../google/cloud/pubsub/v1/StreamingSubscriberConnection.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 3909b039c..45f7fb90c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -117,6 +117,7 @@ protected void doStart() { @Override protected void doStop() { messageDispatcher.stop(); + ackOperationsWaiter.waitComplete(); lock.lock(); try { From dc55fc17c214f66f0a133064bec196d1b1e603dd Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Fri, 31 Jan 2020 12:06:56 -0500 Subject: [PATCH 10/14] Fixing formatting error --- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 2 +- .../src/main/java/com/google/cloud/pubsub/v1/Waiter.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 4f9f66bff..9c84dea0a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -305,7 +305,7 @@ public void run() { // stop connection is no-op if connections haven't been started. stopAllStreamingConnections(); shutdownBackgroundResources(); - subStub.shutdown(); + subStub.shutdownNow(); notifyStopped(); } catch (Exception e) { notifyFailed(e); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java index f540fdf1c..e22125fee 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java @@ -18,8 +18,10 @@ import com.google.api.core.InternalApi; -/** A barrier kind of object that helps keep track of pending actions and - * synchronously wait until all have completed. */ +/** + * A barrier kind of object that helps keep track of pending actions and synchronously wait until + * all have completed. + */ class Waiter { private int pendingCount; From 548b7a70744a281b7bfdb5b0d0f0890c1f11b4d3 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Fri, 31 Jan 2020 14:20:26 -0500 Subject: [PATCH 11/14] fix: making sure all publishes complete before shutting down the publisher --- .../com/google/cloud/pubsub/v1/Publisher.java | 1 + .../cloud/pubsub/v1/PublisherImplTest.java | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 9af734db2..e28427eea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -527,6 +527,7 @@ public void shutdown() { currentAlarmFuture.cancel(false); } publishAllOutstanding(); + messagesWaiter.waitComplete(); backgroundResources.shutdown(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 08fdee7b6..b1109c8fc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -195,6 +195,37 @@ public void testSinglePublishByNumBytes() throws Exception { publisher.awaitTermination(1, TimeUnit.MINUTES); } + @Test + public void testPublishByShutdown() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100)) + .setElementCountThreshold(10L) + .build()) + .build(); + + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1").addMessageIds("2")); + + ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "B"); + + // Note we are not advancing time or reaching the count threshold but messages should + // still get published by call to shutdown + + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + + // Verify the publishes completed + assertTrue(publishFuture1.isDone()); + assertTrue(publishFuture2.isDone()); + assertEquals("1", publishFuture1.get()); + assertEquals("2", publishFuture2.get()); + } + @Test public void testPublishMixedSizeAndDuration() throws Exception { Publisher publisher = From b4439fd3e6c9f816c039d79935fa9cb8899748aa Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Mon, 24 Feb 2020 12:25:22 -0500 Subject: [PATCH 12/14] adding default max outstanding request bytes --- .../src/main/java/com/google/cloud/pubsub/v1/Subscriber.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 9c84dea0a..8baaf2472 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -416,7 +416,10 @@ public static final class Builder { private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; private FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build(); + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1000L) + .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB + .build(); private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; private ExecutorProvider systemExecutorProvider = null; From 9a6381dd13012b609c06faea7e3d8be68872aca6 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 28 Apr 2020 22:28:24 -0400 Subject: [PATCH 13/14] fix: make push endpoint valid https --- .../src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 356f1c207..bb2dab7b2 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -126,7 +126,7 @@ public void testVPCPushSubscriber() { subscriptionAdminClient.createSubscription( subscriptionName, topicName, - PushConfig.newBuilder().setPushEndpoint("random_point").build(), + PushConfig.newBuilder().setPushEndpoint("https://random_point").build(), 10); subscriptionAdminClient.deleteSubscription(subscriptionName); Assert.fail("No exception raised"); From 077f0551c307c96337834611d2df58eed81c20ac Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Sun, 24 May 2020 14:54:25 -0400 Subject: [PATCH 14/14] fix: update ignored diffs --- .../clirr-ignored-differences.xml | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/proto-google-cloud-pubsub-v1/clirr-ignored-differences.xml b/proto-google-cloud-pubsub-v1/clirr-ignored-differences.xml index 7937e7b03..da90cd601 100644 --- a/proto-google-cloud-pubsub-v1/clirr-ignored-differences.xml +++ b/proto-google-cloud-pubsub-v1/clirr-ignored-differences.xml @@ -16,4 +16,14 @@ com/google/pubsub/v1/*OrBuilder boolean has*(*) - \ No newline at end of file + + 7002 + com/google/pubsub/v1/TopicName + java.util.List toStrings(*) + + + 7002 + com/google/pubsub/v1/TopicName + java.util.List parse(*) + +