From 7da39dd698b2feb1e03406cb89b8600b46a73cff Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 Feb 2020 14:05:54 +0000 Subject: [PATCH 1/2] fix: shutdown error on streaming pull callback error --- google/cloud/pubsub_v1/subscriber/futures.py | 5 +++++ .../pubsub_v1/subscriber/test_futures_subscriber.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/futures.py b/google/cloud/pubsub_v1/subscriber/futures.py index 12504c18b..f9fdd76ab 100644 --- a/google/cloud/pubsub_v1/subscriber/futures.py +++ b/google/cloud/pubsub_v1/subscriber/futures.py @@ -33,6 +33,11 @@ def __init__(self, manager): self._cancelled = False def _on_close_callback(self, manager, result): + if self.done(): + # The future has already been resolved in a different thread, + # nothing to do on the streaming pull manager shutdown. + return + if result is None: self.set_result(True) else: diff --git a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index 2b4566018..e29d3e7f9 100644 --- a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -57,6 +57,18 @@ def test__on_close_callback_failure(self): assert not future.running() + def test__on_close_callback_future_already_done(self): + future = self.make_future() + + future.set_result("foo") + assert future.done() + + # invoking on close callback should result in an error + future._on_close_callback(mock.sentinel.manager, "bar") + + result = future.result() + assert result == "foo" # on close callback was a no-op + def test_cancel(self): future = self.make_future() From d38494d4ca508e4fd00a9a292e69825d732ca6e2 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 Feb 2020 16:01:23 +0000 Subject: [PATCH 2/2] fix incorrect comment --- tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index e29d3e7f9..909337cc8 100644 --- a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -63,7 +63,7 @@ def test__on_close_callback_future_already_done(self): future.set_result("foo") assert future.done() - # invoking on close callback should result in an error + # invoking on close callback should not result in an error future._on_close_callback(mock.sentinel.manager, "bar") result = future.result()