diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index 376530caa..567840859 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -188,7 +188,8 @@ def callback(message): try: future.result() except KeyboardInterrupt: - future.cancel() + future.cancel() # Trigger the shutdown. + future.result() # Block until the shutdown is complete. Args: subscription (str): The name of the subscription. The diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py index 0a7576e23..7a5732d20 100644 --- a/samples/snippets/quickstart/sub.py +++ b/samples/snippets/quickstart/sub.py @@ -43,7 +43,8 @@ def callback(message): # exiting while messages get processed in the callbacks. streaming_pull_future.result(timeout=timeout) except: # noqa - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. subscriber_client.close() diff --git a/samples/snippets/schema.py b/samples/snippets/schema.py index 37f9bba55..92c56d9ac 100644 --- a/samples/snippets/schema.py +++ b/samples/snippets/schema.py @@ -343,7 +343,8 @@ def callback(message): # unless an exception occurs first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscribe_avro_records] @@ -393,7 +394,8 @@ def callback(message): # unless an exception occurs first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscribe_proto_messages] diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 112c5a96a..d01860cf8 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -397,7 +397,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] @@ -436,7 +437,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_async_pull_custom_attributes] @@ -474,7 +476,8 @@ def callback(message): # unless an exception is encountered first. streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_flow_settings] @@ -663,10 +666,11 @@ def callback(message): try: streaming_pull_future.result(timeout=timeout) except Exception as e: - streaming_pull_future.cancel() print( f"Listening for messages on {subscription_path} threw an exception: {e}." ) + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_subscriber_error_listener] @@ -697,7 +701,8 @@ def callback(message): try: streaming_pull_future.result(timeout=timeout) except TimeoutError: - streaming_pull_future.cancel() + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. # [END pubsub_dead_letter_delivery_attempt]