From adbcc0c6777e35eae24b538e6c48f9ef7485a786 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Wed, 3 Mar 2021 10:31:24 -0800 Subject: [PATCH] fix: properly shutdown subscriber stub on permanent streaming pull failure (#539) * fix: stop the subscriber stub on streaming pull failure --- .../pubsub/v1/StreamingSubscriberConnection.java | 9 +++++++-- .../java/com/google/cloud/pubsub/v1/Subscriber.java | 13 ++++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 98e055715..249d896b7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -149,8 +149,7 @@ protected void doStart() { @Override protected void doStop() { - messageDispatcher.stop(); - ackOperationsWaiter.waitComplete(); + runShutdown(); lock.lock(); try { @@ -161,6 +160,11 @@ protected void doStop() { } } + private void runShutdown() { + messageDispatcher.stop(); + ackOperationsWaiter.waitComplete(); + } + private class StreamingPullResponseObserver implements ResponseObserver { final SettableApiFuture errorFuture; @@ -282,6 +286,7 @@ public void onFailure(Throwable cause) { ApiExceptionFactory.createException( cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false); logger.log(Level.SEVERE, "terminated streaming with exception", gaxException); + runShutdown(); notifyFailed(gaxException); return; } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 262cf7351..cca9986a8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -308,9 +308,7 @@ protected void doStop() { public void run() { try { // stop connection is no-op if connections haven't been started. - stopAllStreamingConnections(); - shutdownBackgroundResources(); - subStub.shutdownNow(); + runShutdown(); notifyStopped(); } catch (Exception e) { notifyFailed(e); @@ -320,6 +318,12 @@ public void run() { .start(); } + private void runShutdown() { + stopAllStreamingConnections(); + shutdownBackgroundResources(); + subStub.shutdownNow(); + } + private void startStreamingConnections() { synchronized (streamingSubscriberConnections) { for (int i = 0; i < numPullers; i++) { @@ -352,8 +356,7 @@ private void startStreamingConnections() { public void failed(State from, Throwable failure) { // If a connection failed is because of a fatal error, we should fail the // whole subscriber. - stopAllStreamingConnections(); - shutdownBackgroundResources(); + runShutdown(); try { notifyFailed(failure); } catch (IllegalStateException e) {