From d0d0b704642df8dee893d3f585aeb666e19696fb Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 14 Jun 2021 10:12:02 +0200 Subject: [PATCH] docs: block until the streaming pull shuts down (#424) Fixes #423. If subscriber client is used as a context manager, we need to block until the shutdown is complete before leaving the `with` block. See the issue description for more details. **PR checklist:** - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) --- google/cloud/pubsub_v1/subscriber/client.py | 3 ++- samples/snippets/quickstart/sub.py | 3 ++- samples/snippets/schema.py | 6 ++++-- samples/snippets/subscriber.py | 15 ++++++++++----- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index 376530caa..567840859 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -188,7 +188,8 @@ def callback(message): try: future.result() except KeyboardInterrupt: - future.cancel() + future.cancel() # Trigger the shutdown. + future.result() # Block until the shutdown is complete. Args: subscription (str): The name of the subscription. The diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py index 0a7576e23..7a5732d20 100644 --- a/samples/snippets/quickstart/sub.py +++ b/samples/snippets/quickstart/sub.py @@ -43,7 +43,8 @@ def callback(message): # exiting while messages get processed in the callbacks. streaming_pull_future.result(timeout=timeout) except: # noqa - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. subscriber_client.close() diff --git a/samples/snippets/schema.py b/samples/snippets/schema.py index 37f9bba55..92c56d9ac 100644 --- a/samples/snippets/schema.py +++ b/samples/snippets/schema.py @@ -343,7 +343,8 @@ def callback(message): # unless an exception occurs first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscribe_avro_records] @@ -393,7 +394,8 @@ def callback(message): # unless an exception occurs first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscribe_proto_messages] diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 112c5a96a..d01860cf8 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -397,7 +397,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] @@ -436,7 +437,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_async_pull_custom_attributes] @@ -474,7 +476,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_flow_settings] @@ -663,10 +666,11 @@ def callback(message): try: streaming_pull_future.result(timeout=timeout) except Exception as e: - streaming_pull_future.cancel() print( f"Listening for messages on {subscription_path} threw an exception: {e}." ) + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_error_listener] @@ -697,7 +701,8 @@ def callback(message): try: streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_dead_letter_delivery_attempt]