From 9bcc433e6d167f68256e0ede2a521296dcbc7412 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 31 Jan 2020 12:35:09 -0500 Subject: [PATCH] fix: shutdown grpc stubs properly when a subscriber is stopped (#74) * Modifying Publish example in README to match other examples given, and fix issue #6784 * fix: Modifying Publish example in README to match other examples, and fix Issue #11 * feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. * Fix formatting * fix: making changes requested in pull request * fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled * Adding unit test for case in which a received message has no delivery attempt * Making MessageWaiter class more generic to also be used for outstanding ack operations * Waiting for acks to complete before shutting down a streaming subscriber connection * Fixing formatting error --- .../cloud/pubsub/v1/MessageDispatcher.java | 12 ++++----- .../com/google/cloud/pubsub/v1/Publisher.java | 10 +++---- .../v1/StreamingSubscriberConnection.java | 10 ++++++- .../google/cloud/pubsub/v1/Subscriber.java | 1 + .../v1/{MessageWaiter.java => Waiter.java} | 27 ++++++++++--------- ...MessageWaiterTest.java => WaiterTest.java} | 14 +++++----- 6 files changed, 43 insertions(+), 31 deletions(-) rename google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/{MessageWaiter.java => Waiter.java} (68%) rename google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/{MessageWaiterTest.java => WaiterTest.java} (80%) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 56fa8a3a9..c3f1a46cd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -72,7 +72,7 @@ class MessageDispatcher { private final AckProcessor ackProcessor; private final FlowController flowController; - private final MessageWaiter messagesWaiter; + private final Waiter messagesWaiter; // Maps ID to "total expiration time". If it takes longer than this, stop extending. private final ConcurrentMap pendingMessages = new ConcurrentHashMap<>(); @@ -145,7 +145,7 @@ private void forget() { return; } flowController.release(1, outstandingBytes); - messagesWaiter.incrementPendingMessages(-1); + messagesWaiter.incrementPendingCount(-1); } @Override @@ -205,7 +205,7 @@ void sendAckOperations( // 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS this.ackLatencyDistribution = ackLatencyDistribution; jobLock = new ReentrantLock(); - messagesWaiter = new MessageWaiter(); + messagesWaiter = new Waiter(); this.clock = clock; this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); } @@ -268,7 +268,7 @@ public void run() { } void stop() { - messagesWaiter.waitNoMessages(); + messagesWaiter.waitComplete(); jobLock.lock(); try { if (backgroundJob != null) { @@ -331,9 +331,9 @@ void processReceivedMessages(List messages) { } private void processBatch(List batch) { - messagesWaiter.incrementPendingMessages(batch.size()); + messagesWaiter.incrementPendingCount(batch.size()); for (OutstandingMessage message : batch) { - // This is a blocking flow controller. We have already incremented MessageWaiter, so + // This is a blocking flow controller. We have already incremented messagesWaiter, so // shutdown will block on processing of all these messages anyway. try { flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 093590bb9..9af734db2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -104,7 +104,7 @@ public class Publisher { private final AtomicBoolean shutdown; private final BackgroundResource backgroundResources; - private final MessageWaiter messagesWaiter; + private final Waiter messagesWaiter; private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; @@ -173,7 +173,7 @@ private Publisher(Builder builder) throws IOException { backgroundResourceList.add(publisherStub); backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); shutdown = new AtomicBoolean(false); - messagesWaiter = new MessageWaiter(); + messagesWaiter = new Waiter(); } /** Topic which the publisher publishes to. */ @@ -249,7 +249,7 @@ public ApiFuture publish(PubsubMessage message) { messagesBatchLock.unlock(); } - messagesWaiter.incrementPendingMessages(1); + messagesWaiter.incrementPendingCount(1); // For messages without ordering keys, it is okay to send batches without holding // messagesBatchLock. @@ -423,7 +423,7 @@ public void onSuccess(PublishResponse result) { } } } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + messagesWaiter.incrementPendingCount(-outstandingBatch.size()); } } @@ -432,7 +432,7 @@ public void onFailure(Throwable t) { try { outstandingBatch.onFailure(t); } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + messagesWaiter.incrementPendingCount(-outstandingBatch.size()); } } }; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index ef3ce23d6..45f7fb90c 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -72,6 +72,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); + private final Waiter ackOperationsWaiter = new Waiter(); private final Lock lock = new ReentrantLock(); private ClientStream clientStream; @@ -116,6 +117,7 @@ protected void doStart() { @Override protected void doStop() { messageDispatcher.stop(); + ackOperationsWaiter.waitComplete(); lock.lock(); try { @@ -273,16 +275,18 @@ public void sendAckOperations( new ApiFutureCallback() { @Override public void onSuccess(Empty empty) { - // noop + ackOperationsWaiter.incrementPendingCount(-1); } @Override public void onFailure(Throwable t) { + ackOperationsWaiter.incrementPendingCount(-1); Level level = isAlive() ? Level.WARNING : Level.FINER; logger.log(level, "failed to send operations", t); } }; + int pendingOperations = 0; for (PendingModifyAckDeadline modack : ackDeadlineExtensions) { for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) { ApiFuture future = @@ -294,6 +298,7 @@ public void onFailure(Throwable t) { .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) .build()); ApiFutures.addCallback(future, loggingCallback, directExecutor()); + pendingOperations++; } } @@ -306,6 +311,9 @@ public void onFailure(Throwable t) { .addAllAckIds(idChunk) .build()); ApiFutures.addCallback(future, loggingCallback, directExecutor()); + pendingOperations++; } + + ackOperationsWaiter.incrementPendingCount(pendingOperations); } } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 2deafb2e7..9c84dea0a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -305,6 +305,7 @@ public void run() { // stop connection is no-op if connections haven't been started. stopAllStreamingConnections(); shutdownBackgroundResources(); + subStub.shutdownNow(); notifyStopped(); } catch (Exception e) { notifyFailed(e); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java similarity index 68% rename from google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java rename to google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java index cb238f3d3..e22125fee 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java @@ -18,25 +18,28 @@ import com.google.api.core.InternalApi; -/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */ -class MessageWaiter { - private int pendingMessages; +/** + * A barrier kind of object that helps keep track of pending actions and synchronously wait until + * all have completed. + */ +class Waiter { + private int pendingCount; - MessageWaiter() { - pendingMessages = 0; + Waiter() { + pendingCount = 0; } - public synchronized void incrementPendingMessages(int messages) { - this.pendingMessages += messages; - if (pendingMessages == 0) { + public synchronized void incrementPendingCount(int delta) { + this.pendingCount += delta; + if (pendingCount == 0) { notifyAll(); } } - public synchronized void waitNoMessages() { + public synchronized void waitComplete() { boolean interrupted = false; try { - while (pendingMessages > 0) { + while (pendingCount > 0) { try { wait(); } catch (InterruptedException e) { @@ -52,7 +55,7 @@ public synchronized void waitNoMessages() { } @InternalApi - public int pendingMessages() { - return pendingMessages; + public int pendingCount() { + return pendingCount; } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java similarity index 80% rename from google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java rename to google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java index 5f8e19875..ca8618378 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java @@ -22,14 +22,14 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** Tests for {@link MessageWaiter}. */ +/** Tests for {@link Waiter}. */ @RunWith(JUnit4.class) -public class MessageWaiterTest { +public class WaiterTest { @Test public void test() throws Exception { - final MessageWaiter waiter = new MessageWaiter(); - waiter.incrementPendingMessages(1); + final Waiter waiter = new Waiter(); + waiter.incrementPendingCount(1); final Thread mainThread = Thread.currentThread(); Thread t = @@ -40,14 +40,14 @@ public void run() { while (mainThread.getState() != Thread.State.WAITING) { Thread.yield(); } - waiter.incrementPendingMessages(-1); + waiter.incrementPendingCount(-1); } }); t.start(); - waiter.waitNoMessages(); + waiter.waitComplete(); t.join(); - assertEquals(0, waiter.pendingMessages()); + assertEquals(0, waiter.pendingCount()); } }