From 552539e7beb30833c39dd29bfcb0183a07895f97 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 Feb 2020 16:21:57 +0000 Subject: [PATCH] fix: shutdown error on streaming pull callback error (#40) * fix: shutdown error on streaming pull callback error * fix incorrect comment --- google/cloud/pubsub_v1/subscriber/futures.py | 5 +++++ .../pubsub_v1/subscriber/test_futures_subscriber.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/futures.py b/google/cloud/pubsub_v1/subscriber/futures.py index 12504c18b..f9fdd76ab 100644 --- a/google/cloud/pubsub_v1/subscriber/futures.py +++ b/google/cloud/pubsub_v1/subscriber/futures.py @@ -33,6 +33,11 @@ def __init__(self, manager): self._cancelled = False def _on_close_callback(self, manager, result): + if self.done(): + # The future has already been resolved in a different thread, + # nothing to do on the streaming pull manager shutdown. + return + if result is None: self.set_result(True) else: diff --git a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index 2b4566018..909337cc8 100644 --- a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -57,6 +57,18 @@ def test__on_close_callback_failure(self): assert not future.running() + def test__on_close_callback_future_already_done(self): + future = self.make_future() + + future.set_result("foo") + assert future.done() + + # invoking on close callback should not result in an error + future._on_close_callback(mock.sentinel.manager, "bar") + + result = future.result() + assert result == "foo" # on close callback was a no-op + def test_cancel(self): future = self.make_future()