Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: shutdown grpc stubs properly when a subscriber is stopped #74

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
19dd129
Modifying Publish example in README to match other examples given, and
hannahrogers-google Nov 25, 2019
4158529
fix: Modifying Publish example in README to match other examples, and
hannahrogers-google Nov 25, 2019
7d704f0
Merge branch 'master' of github.com:hannahrogers-google/java-pubsub
hannahrogers-google Jan 9, 2020
9e0ebc6
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 9, 2020
01d41b7
feat: Adding support for DLQs
hannahrogers-google Jan 13, 2020
ed3b1ee
Fix formatting
hannahrogers-google Jan 13, 2020
72f7996
fix: making changes requested in pull request
hannahrogers-google Jan 14, 2020
389cb86
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 14, 2020
c9e4bd2
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 29, 2020
3738ec8
fix: creating fix to not populate delivery attempt attribute when dead
hannahrogers-google Jan 29, 2020
4ce1d3b
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 29, 2020
aecd4ca
Adding unit test for case in which a received message has no delivery…
hannahrogers-google Jan 30, 2020
77fa3b3
Making MessageWaiter class more generic to also be used for outstanding
hannahrogers-google Jan 30, 2020
b87023c
Merge branch 'master' of https://github.com/googleapis/java-pubsub
hannahrogers-google Jan 30, 2020
7086b3c
Merge branch 'master' into fix-subscriber-stop
hannahrogers-google Jan 30, 2020
da0e355
Waiting for acks to complete before shutting down a streaming subscriber
hannahrogers-google Jan 30, 2020
dc55fc1
Fixing formatting error
hannahrogers-google Jan 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -72,7 +72,7 @@ class MessageDispatcher {
private final AckProcessor ackProcessor;

private final FlowController flowController;
private final MessageWaiter messagesWaiter;
private final Waiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<String, AckHandler> pendingMessages = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -145,7 +145,7 @@ private void forget() {
return;
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
messagesWaiter.incrementPendingCount(-1);
}

@Override
Expand Down Expand Up @@ -205,7 +205,7 @@ void sendAckOperations(
// 601 buckets of 1s resolution from 0s to MAX_ACK_DEADLINE_SECONDS
this.ackLatencyDistribution = ackLatencyDistribution;
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
messagesWaiter = new Waiter();
this.clock = clock;
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
}
Expand Down Expand Up @@ -268,7 +268,7 @@ public void run() {
}

void stop() {
messagesWaiter.waitNoMessages();
messagesWaiter.waitComplete();
jobLock.lock();
try {
if (backgroundJob != null) {
Expand Down Expand Up @@ -331,9 +331,9 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
}

private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingMessages(batch.size());
messagesWaiter.incrementPendingCount(batch.size());
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented MessageWaiter, so
// This is a blocking flow controller. We have already incremented messagesWaiter, so
// shutdown will block on processing of all these messages anyway.
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
Expand Down
Expand Up @@ -104,7 +104,7 @@ public class Publisher {

private final AtomicBoolean shutdown;
private final BackgroundResource backgroundResources;
private final MessageWaiter messagesWaiter;
private final Waiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

Expand Down Expand Up @@ -173,7 +173,7 @@ private Publisher(Builder builder) throws IOException {
backgroundResourceList.add(publisherStub);
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
shutdown = new AtomicBoolean(false);
messagesWaiter = new MessageWaiter();
messagesWaiter = new Waiter();
}

/** Topic which the publisher publishes to. */
Expand Down Expand Up @@ -249,7 +249,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
messagesBatchLock.unlock();
}

messagesWaiter.incrementPendingMessages(1);
messagesWaiter.incrementPendingCount(1);

// For messages without ordering keys, it is okay to send batches without holding
// messagesBatchLock.
Expand Down Expand Up @@ -423,7 +423,7 @@ public void onSuccess(PublishResponse result) {
}
}
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
}
}

Expand All @@ -432,7 +432,7 @@ public void onFailure(Throwable t) {
try {
outstandingBatch.onFailure(t);
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
}
}
};
Expand Down
Expand Up @@ -72,6 +72,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
private final Waiter ackOperationsWaiter = new Waiter();

private final Lock lock = new ReentrantLock();
private ClientStream<StreamingPullRequest> clientStream;
Expand Down Expand Up @@ -116,6 +117,7 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
ackOperationsWaiter.waitComplete();

lock.lock();
try {
Expand Down Expand Up @@ -273,16 +275,18 @@ public void sendAckOperations(
new ApiFutureCallback<Empty>() {
@Override
public void onSuccess(Empty empty) {
// noop
ackOperationsWaiter.incrementPendingCount(-1);
}

@Override
public void onFailure(Throwable t) {
ackOperationsWaiter.incrementPendingCount(-1);
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};

int pendingOperations = 0;
for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
for (List<String> idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
ApiFuture<Empty> future =
Expand All @@ -294,6 +298,7 @@ public void onFailure(Throwable t) {
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build());
ApiFutures.addCallback(future, loggingCallback, directExecutor());
pendingOperations++;
}
}

Expand All @@ -306,6 +311,9 @@ public void onFailure(Throwable t) {
.addAllAckIds(idChunk)
.build());
ApiFutures.addCallback(future, loggingCallback, directExecutor());
pendingOperations++;
}

ackOperationsWaiter.incrementPendingCount(pendingOperations);
}
}
Expand Up @@ -305,6 +305,7 @@ public void run() {
// stop connection is no-op if connections haven't been started.
stopAllStreamingConnections();
shutdownBackgroundResources();
subStub.shutdownNow();
notifyStopped();
} catch (Exception e) {
notifyFailed(e);
Expand Down
Expand Up @@ -18,25 +18,28 @@

import com.google.api.core.InternalApi;

/** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */
class MessageWaiter {
private int pendingMessages;
/**
* A barrier kind of object that helps keep track of pending actions and synchronously wait until
* all have completed.
*/
class Waiter {
private int pendingCount;

MessageWaiter() {
pendingMessages = 0;
Waiter() {
pendingCount = 0;
}

public synchronized void incrementPendingMessages(int messages) {
this.pendingMessages += messages;
if (pendingMessages == 0) {
public synchronized void incrementPendingCount(int delta) {
this.pendingCount += delta;
if (pendingCount == 0) {
notifyAll();
}
}

public synchronized void waitNoMessages() {
public synchronized void waitComplete() {
boolean interrupted = false;
try {
while (pendingMessages > 0) {
while (pendingCount > 0) {
try {
wait();
} catch (InterruptedException e) {
Expand All @@ -52,7 +55,7 @@ public synchronized void waitNoMessages() {
}

@InternalApi
public int pendingMessages() {
return pendingMessages;
public int pendingCount() {
return pendingCount;
}
}
Expand Up @@ -22,14 +22,14 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MessageWaiter}. */
/** Tests for {@link Waiter}. */
@RunWith(JUnit4.class)
public class MessageWaiterTest {
public class WaiterTest {

@Test
public void test() throws Exception {
final MessageWaiter waiter = new MessageWaiter();
waiter.incrementPendingMessages(1);
final Waiter waiter = new Waiter();
waiter.incrementPendingCount(1);

final Thread mainThread = Thread.currentThread();
Thread t =
Expand All @@ -40,14 +40,14 @@ public void run() {
while (mainThread.getState() != Thread.State.WAITING) {
Thread.yield();
}
waiter.incrementPendingMessages(-1);
waiter.incrementPendingCount(-1);
}
});
t.start();

waiter.waitNoMessages();
waiter.waitComplete();
t.join();

assertEquals(0, waiter.pendingMessages());
assertEquals(0, waiter.pendingCount());
}
}