From 093cabff9f0464b1dfaa8f373b6fffbc439518de Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 1 Oct 2021 21:22:50 +0200 Subject: [PATCH] docs: add type annotations to codebase (#509) Closes #500. **PR checklist:** - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsub/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) --- google/cloud/pubsub_v1/_gapic.py | 9 +- google/cloud/pubsub_v1/futures.py | 14 +- .../cloud/pubsub_v1/publisher/_batch/base.py | 50 +++-- .../pubsub_v1/publisher/_batch/thread.py | 114 +++++----- .../pubsub_v1/publisher/_sequencer/base.py | 25 ++- .../publisher/_sequencer/ordered_sequencer.py | 60 +++--- .../_sequencer/unordered_sequencer.py | 47 ++-- google/cloud/pubsub_v1/publisher/client.py | 114 ++++++---- .../cloud/pubsub_v1/publisher/exceptions.py | 2 +- .../pubsub_v1/publisher/flow_controller.py | 44 ++-- google/cloud/pubsub_v1/publisher/futures.py | 12 +- .../subscriber/_protocol/dispatcher.py | 60 ++++-- .../subscriber/_protocol/heartbeater.py | 14 +- .../subscriber/_protocol/helper_threads.py | 53 +++-- .../subscriber/_protocol/histogram.py | 49 +++-- .../pubsub_v1/subscriber/_protocol/leaser.py | 41 ++-- .../subscriber/_protocol/messages_on_hold.py | 49 +++-- .../subscriber/_protocol/requests.py | 42 ++-- .../_protocol/streaming_pull_manager.py | 184 ++++++++-------- google/cloud/pubsub_v1/subscriber/client.py | 63 +++--- google/cloud/pubsub_v1/subscriber/futures.py | 19 +- google/cloud/pubsub_v1/subscriber/message.py | 109 ++++++---- .../cloud/pubsub_v1/subscriber/scheduler.py | 64 +++--- google/cloud/pubsub_v1/types.py | 203 +++++++++--------- 24 files changed, 805 insertions(+), 636 deletions(-) diff --git a/google/cloud/pubsub_v1/_gapic.py b/google/cloud/pubsub_v1/_gapic.py index 2a50994fe..e25c1dc6c 100644 --- a/google/cloud/pubsub_v1/_gapic.py +++ b/google/cloud/pubsub_v1/_gapic.py @@ -15,9 +15,12 @@ from __future__ import absolute_import import functools +from typing import Callable, Container, Type -def add_methods(source_class, denylist=()): +def add_methods( + source_class: Type, denylist: Container[str] = () +) -> Callable[[Type], Type]: """Add wrapped versions of the `api` member's methods to the class. Any methods passed in `denylist` are not added. @@ -25,7 +28,7 @@ def add_methods(source_class, denylist=()): not added. """ - def wrap(wrapped_fx, lookup_fx): + def wrap(wrapped_fx: Callable, lookup_fx: Callable): """Wrap a GAPIC method; preserve its name and docstring.""" # If this is a static or class method, then we do *not* # send self as the first argument. @@ -40,7 +43,7 @@ def wrap(wrapped_fx, lookup_fx): fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) # noqa return functools.wraps(wrapped_fx)(fx) - def actual_decorator(cls): + def actual_decorator(cls: Type) -> Type: # Reflectively iterate over most of the methods on the source class # (the GAPIC) and make wrapped versions available on this client. for name in dir(source_class): diff --git a/google/cloud/pubsub_v1/futures.py b/google/cloud/pubsub_v1/futures.py index 4dc72fdaa..d8acc8ea5 100644 --- a/google/cloud/pubsub_v1/futures.py +++ b/google/cloud/pubsub_v1/futures.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import concurrent.futures +from typing import Any, NoReturn import google.api_core.future @@ -29,19 +30,16 @@ class Future(concurrent.futures.Future, google.api_core.future.Future): methods in this library. """ - def running(self): - """Return ``True`` if the associated Pub/Sub action has not yet completed. - - Returns: bool: - """ + def running(self) -> bool: + """Return ``True`` if the associated Pub/Sub action has not yet completed.""" return not self.done() - def set_running_or_notify_cancel(self): + def set_running_or_notify_cancel(self) -> NoReturn: raise NotImplementedError( "Only used by executors from `concurrent.futures` package." ) - def set_result(self, result): + def set_result(self, result: Any): """Set the return value of work associated with the future. Do not use this method, it should only be used internally by the library and its @@ -49,7 +47,7 @@ def set_result(self, result): """ return super().set_result(result=result) - def set_exception(self, exception): + def set_exception(self, exception: Exception): """Set the result of the future as being the given exception. Do not use this method, it should only be used internally by the library and its diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 812e0e0e2..f32028360 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -16,6 +16,14 @@ import abc import enum +import typing +from typing import Optional, Sequence + + +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1 import types + from google.pubsub_v1 import types as gapic_types class Batch(metaclass=abc.ABCMeta): @@ -50,7 +58,7 @@ def __len__(self): @staticmethod @abc.abstractmethod - def make_lock(): # pragma: NO COVER + def make_lock() -> None: # pragma: NO COVER """Return a lock in the chosen concurrency model. Returns: @@ -60,17 +68,17 @@ def make_lock(): # pragma: NO COVER @property @abc.abstractmethod - def messages(self): # pragma: NO COVER + def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER """Return the messages currently in the batch. Returns: - Sequence: The messages currently in the batch. + The messages currently in the batch. """ raise NotImplementedError @property @abc.abstractmethod - def size(self): # pragma: NO COVER + def size(self) -> int: # pragma: NO COVER """Return the total size of all of the messages currently in the batch. The size includes any overhead of the actual ``PublishRequest`` that is @@ -84,42 +92,45 @@ def size(self): # pragma: NO COVER @property @abc.abstractmethod - def settings(self): # pragma: NO COVER + def settings(self) -> "types.BatchSettings": # pragma: NO COVER """Return the batch settings. Returns: - ~.pubsub_v1.types.BatchSettings: The batch settings. These are - considered immutable once the batch has been opened. + The batch settings. These are considered immutable once the batch has + been opened. """ raise NotImplementedError @property @abc.abstractmethod - def status(self): # pragma: NO COVER + def status(self) -> "BatchStatus": # pragma: NO COVER """Return the status of this batch. Returns: - str: The status of this batch. All statuses are human-readable, - all-lowercase strings. The ones represented in the - :class:`BaseBatch.Status` enum are special, but other statuses - are permitted. + The status of this batch. All statuses are human-readable, all-lowercase + strings. The ones represented in the :class:`BaseBatch.Status` enum are + special, but other statuses are permitted. """ raise NotImplementedError - def cancel(self, cancellation_reason): # pragma: NO COVER + def cancel( + self, cancellation_reason: "BatchCancellationReason" + ) -> None: # pragma: NO COVER """Complete pending futures with an exception. This method must be called before publishing starts (ie: while the batch is still accepting messages.) Args: - cancellation_reason (BatchCancellationReason): The reason why this - batch has been cancelled. + cancellation_reason: + The reason why this batch has been cancelled. """ raise NotImplementedError @abc.abstractmethod - def publish(self, message): # pragma: NO COVER + def publish( + self, message: "gapic_types.PubsubMessage" + ) -> Optional["pubsub_v1.publisher.futures.Future"]: # pragma: NO COVER """Publish a single message. Add the given message to this object; this will cause it to be @@ -129,11 +140,12 @@ def publish(self, message): # pragma: NO COVER This method is called by :meth:`~.PublisherClient.publish`. Args: - message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message. + message: The Pub/Sub message. Returns: - ~google.api_core.future.Future: An object conforming to the - :class:`concurrent.futures.Future` interface. + An object conforming to the :class:`concurrent.futures.Future` interface. + If :data:`None` is returned, that signals that the batch cannot + accept a message. """ raise NotImplementedError diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index e59dff00e..d68d00a0e 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -17,6 +17,8 @@ import logging import threading import time +import typing +from typing import Any, Callable, Optional, Sequence import google.api_core.exceptions from google.api_core import gapic_v1 @@ -25,6 +27,12 @@ from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from google import api_core + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1 import types + from google.cloud.pubsub_v1 import PublisherClient + _LOGGER = logging.getLogger(__name__) _CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING) @@ -56,36 +64,36 @@ class Batch(base.Batch): implementation details. Args: - client (~.pubsub_v1.PublisherClient): The publisher client used to - create this batch. - topic (str): The topic. The format for this is - ``projects/{project}/topics/{topic}``. - settings (~.pubsub_v1.types.BatchSettings): The settings for batch - publishing. These should be considered immutable once the batch - has been opened. - batch_done_callback (Callable[[bool], Any]): Callback called when the - response for a batch publish has been received. Called with one - boolean argument: successfully published or a permanent error - occurred. Temporary errors are not surfaced because they are retried + client: + The publisher client used to create this batch. + topic: + The topic. The format for this is ``projects/{project}/topics/{topic}``. + settings: + The settings for batch publishing. These should be considered immutable + once the batch has been opened. + batch_done_callback: + Callback called when the response for a batch publish has been received. + Called with one boolean argument: successfully published or a permanent + error occurred. Temporary errors are not surfaced because they are retried at a lower level. - commit_when_full (bool): Whether to commit the batch when the batch - is full. - commit_retry (Optional[google.api_core.retry.Retry]): Designation of what - errors, if any, should be retried when commiting the batch. If not - provided, a default retry is used. - commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): - The timeout to apply when commiting the batch. If not provided, a - default timeout is used. + commit_when_full: + Whether to commit the batch when the batch is full. + commit_retry: + Designation of what errors, if any, should be retried when commiting + the batch. If not provided, a default retry is used. + commit_timeout: + The timeout to apply when commiting the batch. If not provided, a default + timeout is used. """ def __init__( self, - client, - topic, - settings, - batch_done_callback=None, - commit_when_full=True, - commit_retry=gapic_v1.method.DEFAULT, + client: "PublisherClient", + topic: str, + settings: "types.BatchSettings", + batch_done_callback: Callable[[bool], Any] = None, + commit_when_full: bool = True, + commit_retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): self._client = client @@ -113,66 +121,65 @@ def __init__( self._commit_timeout = commit_timeout @staticmethod - def make_lock(): + def make_lock() -> threading.Lock: """Return a threading lock. Returns: - _thread.Lock: A newly created lock. + A newly created lock. """ return threading.Lock() @property - def client(self): - """~.pubsub_v1.client.PublisherClient: A publisher client.""" + def client(self) -> "PublisherClient": + """A publisher client.""" return self._client @property - def messages(self): - """Sequence: The messages currently in the batch.""" + def messages(self) -> Sequence[gapic_types.PubsubMessage]: + """The messages currently in the batch.""" return self._messages @property - def settings(self): + def settings(self) -> "types.BatchSettings": """Return the batch settings. Returns: - ~.pubsub_v1.types.BatchSettings: The batch settings. These are - considered immutable once the batch has been opened. + The batch settings. These are considered immutable once the batch has + been opened. """ return self._settings @property - def size(self): + def size(self) -> int: """Return the total size of all of the messages currently in the batch. The size includes any overhead of the actual ``PublishRequest`` that is sent to the backend. Returns: - int: The total size of all of the messages currently - in the batch (including the request overhead), in bytes. + The total size of all of the messages currently in the batch (including + the request overhead), in bytes. """ return self._size @property - def status(self): + def status(self) -> base.BatchStatus: """Return the status of this batch. Returns: - str: The status of this batch. All statuses are human-readable, - all-lowercase strings. + The status of this batch. All statuses are human-readable, all-lowercase + strings. """ return self._status - def cancel(self, cancellation_reason): + def cancel(self, cancellation_reason: base.BatchCancellationReason) -> None: """Complete pending futures with an exception. This method must be called before publishing starts (ie: while the batch is still accepting messages.) Args: - cancellation_reason (BatchCancellationReason): The reason why this - batch has been cancelled. + The reason why this batch has been cancelled. """ with self._state_lock: @@ -185,7 +192,7 @@ def cancel(self, cancellation_reason): future.set_exception(exc) self._status = base.BatchStatus.ERROR - def commit(self): + def commit(self) -> None: """Actually publish all of the messages on the active batch. .. note:: @@ -210,7 +217,7 @@ def commit(self): self._start_commit_thread() - def _start_commit_thread(self): + def _start_commit_thread(self) -> None: """Start a new thread to actually handle the commit.""" # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 @@ -220,7 +227,7 @@ def _start_commit_thread(self): ) commit_thread.start() - def _commit(self): + def _commit(self) -> None: """Actually publish all of the messages on the active batch. This moves the batch out from being the active batch to an in progress @@ -320,7 +327,9 @@ def _commit(self): if self._batch_done_callback is not None: self._batch_done_callback(batch_transport_succeeded) - def publish(self, message): + def publish( + self, message: gapic_types.PubsubMessage + ) -> Optional["pubsub_v1.publisher.futures.Future"]: """Publish a single message. Add the given message to this object; this will cause it to be @@ -331,13 +340,12 @@ def publish(self, message): This method is called by :meth:`~.PublisherClient.publish`. Args: - message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message. + message: The Pub/Sub message. Returns: - Optional[~google.api_core.future.Future]: An object conforming to - the :class:`~concurrent.futures.Future` interface or :data:`None`. - If :data:`None` is returned, that signals that the batch cannot - accept a message. + An object conforming to the :class:`~concurrent.futures.Future` interface + or :data:`None`. If :data:`None` is returned, that signals that the batch + cannot accept a message. Raises: pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing @@ -398,5 +406,5 @@ def publish(self, message): return future - def _set_status(self, status): + def _set_status(self, status: base.BatchStatus): self._status = status diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/base.py b/google/cloud/pubsub_v1/publisher/_sequencer/base.py index 60a7d269c..49bdcb740 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/base.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/base.py @@ -15,10 +15,15 @@ from __future__ import absolute_import import abc +import typing from google.api_core import gapic_v1 from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from concurrent import futures + from google.api_core import retry + class Sequencer(metaclass=abc.ABCMeta): """The base class for sequencers for Pub/Sub publishing. A sequencer @@ -27,7 +32,7 @@ class Sequencer(metaclass=abc.ABCMeta): @staticmethod @abc.abstractmethod - def is_finished(self): # pragma: NO COVER + def is_finished(self) -> bool: # pragma: NO COVER """ Whether the sequencer is finished and should be cleaned up. Returns: @@ -37,7 +42,7 @@ def is_finished(self): # pragma: NO COVER @staticmethod @abc.abstractmethod - def unpause(self, message): # pragma: NO COVER + def unpause(self) -> None: # pragma: NO COVER """ Unpauses this sequencer. Raises: @@ -50,24 +55,24 @@ def unpause(self, message): # pragma: NO COVER @abc.abstractmethod def publish( self, - message, - retry=None, + message: gapic_types.PubsubMessage, + retry: "retry.Retry" = None, timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - ): # pragma: NO COVER + ) -> "futures.Future": # pragma: NO COVER """ Publish message for this ordering key. Args: - message (~.pubsub_v1.types.PubsubMessage): + message: The Pub/Sub message. - retry (Optional[google.api_core.retry.Retry]): + retry: The retry settings to apply when publishing the message. - timeout (:class:`~.pubsub_v1.types.TimeoutType`): + timeout: The timeout to apply when publishing the message. Returns: A class instance that conforms to Python Standard library's - :class:`~concurrent.futures.Future` interface (but not an - instance of that class). The future might return immediately with a + :class:`~concurrent.futures.Future` interface. The future might return + immediately with a `pubsub_v1.publisher.exceptions.PublishToPausedOrderingKeyException` if the ordering key is paused. Otherwise, the future tracks the lifetime of the message publish. diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 83dd0c921..106c4da99 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -16,6 +16,8 @@ import collections import concurrent.futures as futures import threading +import typing +from typing import Iterable, Sequence from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher import exceptions @@ -23,6 +25,11 @@ from google.cloud.pubsub_v1.publisher._batch import base as batch_base from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.api_core import retry + from google.cloud.pubsub_v1 import PublisherClient + from google.cloud.pubsub_v1.publisher import _batch + class _OrderedSequencerStatus(str, enum.Enum): """An enum-like class representing valid statuses for an OrderedSequencer. @@ -77,14 +84,15 @@ class OrderedSequencer(sequencer_base.Sequencer): Public methods are thread-safe. Args: - client (~.pubsub_v1.PublisherClient): The publisher client used to - create this sequencer. - topic (str): The topic. The format for this is - ``projects/{project}/topics/{topic}``. - ordering_key (str): The ordering key for this sequencer. + client: + The publisher client used to create this sequencer. + topic: + The topic. The format for this is ``projects/{project}/topics/{topic}``. + ordering_key: + The ordering key for this sequencer. """ - def __init__(self, client, topic, ordering_key): + def __init__(self, client: "PublisherClient", topic: str, ordering_key: str): self._client = client self._topic = topic self._ordering_key = ordering_key @@ -97,16 +105,16 @@ def __init__(self, client, topic, ordering_key): # See _OrderedSequencerStatus for valid state transitions. self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES - def is_finished(self): + def is_finished(self) -> bool: """ Whether the sequencer is finished and should be cleaned up. Returns: - bool: Whether the sequencer is finished and should be cleaned up. + Whether the sequencer is finished and should be cleaned up. """ with self._state_lock: return self._state == _OrderedSequencerStatus.FINISHED - def stop(self): + def stop(self) -> None: """ Permanently stop this sequencer. This differs from pausing, which may be resumed. Immediately commits @@ -133,7 +141,7 @@ def stop(self): batch = self._ordered_batches.pop() batch.cancel(batch_base.BatchCancellationReason.CLIENT_STOPPED) - def commit(self): + def commit(self) -> None: """ Commit the first batch, if unpaused. If paused or no batches exist, this method does nothing. @@ -151,7 +159,7 @@ def commit(self): # operation is idempotent. self._ordered_batches[0].commit() - def _batch_done_callback(self, success): + def _batch_done_callback(self, success: bool) -> None: """ Deal with completion of a batch. Called when a batch has finished publishing, with either a success @@ -199,7 +207,7 @@ def _batch_done_callback(self, success): if ensure_cleanup_and_commit_timer_runs: self._client.ensure_cleanup_and_commit_timer_runs() - def _pause(self): + def _pause(self) -> None: """ Pause this sequencer: set state to paused, cancel all batches, and clear the list of ordered batches. @@ -215,7 +223,7 @@ def _pause(self): ) self._ordered_batches.clear() - def unpause(self): + def unpause(self) -> None: """ Unpause this sequencer. Raises: @@ -229,16 +237,16 @@ def unpause(self): def _create_batch( self, - commit_retry=gapic_v1.method.DEFAULT, + commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - ): + ) -> "_batch.thread.Batch": """ Create a new batch using the client's batch class and other stored settings. Args: - commit_retry (Optional[google.api_core.retry.Retry]): + commit_retry: The retry settings to apply when publishing the batch. - commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + commit_timeout: The timeout to apply when publishing the batch. """ return self._client._batch_class( @@ -253,18 +261,18 @@ def _create_batch( def publish( self, - message, - retry=gapic_v1.method.DEFAULT, + message: gapic_types.PubsubMessage, + retry: "retry.Retry" = gapic_v1.method.DEFAULT, timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - ): + ) -> futures.Future: """ Publish message for this ordering key. Args: - message (~.pubsub_v1.types.PubsubMessage): + message: The Pub/Sub message. - retry (Optional[google.api_core.retry.Retry]): + retry: The retry settings to apply when publishing the message. - timeout (:class:`~.pubsub_v1.types.TimeoutType`): + timeout: The timeout to apply when publishing the message. Returns: @@ -317,13 +325,13 @@ def publish( return future # Used only for testing. - def _set_batch(self, batch): + def _set_batch(self, batch: "_batch.thread.Batch") -> None: self._ordered_batches = collections.deque([batch]) # Used only for testing. - def _set_batches(self, batches): + def _set_batches(self, batches: Iterable["_batch.thread.Batch"]) -> None: self._ordered_batches = collections.deque(batches) # Used only for testing. - def _get_batches(self): + def _get_batches(self) -> Sequence["_batch.thread.Batch"]: return self._ordered_batches diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 76dd1cad7..91d47b948 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -12,11 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from concurrent import futures + from google.api_core import retry + from google.cloud.pubsub_v1 import PublisherClient + from google.cloud.pubsub_v1.publisher import _batch + class UnorderedSequencer(base.Sequencer): """ Sequences messages into batches for one topic without any ordering. @@ -24,17 +32,17 @@ class UnorderedSequencer(base.Sequencer): Public methods are NOT thread-safe. """ - def __init__(self, client, topic): + def __init__(self, client: "PublisherClient", topic: str): self._client = client self._topic = topic self._current_batch = None self._stopped = False - def is_finished(self): + def is_finished(self) -> bool: """ Whether the sequencer is finished and should be cleaned up. Returns: - bool: Whether the sequencer is finished and should be cleaned up. + Whether the sequencer is finished and should be cleaned up. """ # TODO: Implement. Not implementing yet because of possible performance # impact due to extra locking required. This does mean that @@ -42,7 +50,7 @@ def is_finished(self): # previously existing behavior. return False - def stop(self): + def stop(self) -> None: """ Stop the sequencer. Subsequent publishes will fail. @@ -56,7 +64,7 @@ def stop(self): self.commit() self._stopped = True - def commit(self): + def commit(self) -> None: """ Commit the batch. Raises: @@ -74,22 +82,22 @@ def commit(self): # batch. self._current_batch = None - def unpause(self): + def unpause(self) -> typing.NoReturn: """ Not relevant for this class. """ raise NotImplementedError def _create_batch( self, - commit_retry=gapic_v1.method.DEFAULT, + commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - ): + ) -> "_batch.thread.Batch": """ Create a new batch using the client's batch class and other stored settings. Args: - commit_retry (Optional[google.api_core.retry.Retry]): + commit_retry: The retry settings to apply when publishing the batch. - commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + commit_timeout: The timeout to apply when publishing the batch. """ return self._client._batch_class( @@ -104,24 +112,23 @@ def _create_batch( def publish( self, - message, - retry=gapic_v1.method.DEFAULT, + message: gapic_types.PubsubMessage, + retry: "retry.Retry" = gapic_v1.method.DEFAULT, timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - ): + ) -> "futures.Future": """ Batch message into existing or new batch. Args: - message (~.pubsub_v1.types.PubsubMessage): + message: The Pub/Sub message. - retry (Optional[google.api_core.retry.Retry]): + retry: The retry settings to apply when publishing the message. - timeout (:class:`~.pubsub_v1.types.TimeoutType`): + timeout: The timeout to apply when publishing the message. Returns: - ~google.api_core.future.Future: An object conforming to - the :class:`~concurrent.futures.Future` interface. The future tracks - the publishing status of the message. + An object conforming to the :class:`~concurrent.futures.Future` interface. + The future tracks the publishing status of the message. Raises: RuntimeError: @@ -151,5 +158,5 @@ def publish( return future # Used only for testing. - def _set_batch(self, batch): + def _set_batch(self, batch: "_batch.thread.Batch") -> None: self._current_batch = batch diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 1c5381d08..7e7c01c19 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -20,6 +20,8 @@ import pkg_resources import threading import time +import typing +from typing import Any, Sequence, Type, Union from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials @@ -43,6 +45,13 @@ # PIP package. __version__ = "0.0" +if typing.TYPE_CHECKING: # pragma: NO COVER + from google import api_core + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.publisher._sequencer.base import Sequencer + from google.cloud.pubsub_v1.publisher import _batch + + _LOGGER = logging.getLogger(__name__) _DENYLISTED_METHODS = ( @@ -63,13 +72,14 @@ class Client(object): get sensible defaults. Args: - batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The - settings for batch publishing. - publisher_options (~google.cloud.pubsub_v1.types.PublisherOptions): The - options for the publisher client. Note that enabling message ordering will - override the publish retry timeout to be infinite. - kwargs (dict): Any additional arguments provided are sent as keyword - arguments to the underlying + batch_settings: + The settings for batch publishing. + publisher_options: + The options for the publisher client. Note that enabling message ordering + will override the publish retry timeout to be infinite. + kwargs: + Any additional arguments provided are sent as keyword arguments to the + underlying :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`. Generally you should not need to set additional keyword arguments. Regional endpoints can be set via ``client_options`` that @@ -104,14 +114,19 @@ class Client(object): ) """ - def __init__(self, batch_settings=(), publisher_options=(), **kwargs): + def __init__( + self, + batch_settings: Union[types.BatchSettings, Sequence] = (), + publisher_options: Union[types.PublisherOptions, Sequence] = (), + **kwargs: Any, + ): assert ( type(batch_settings) is types.BatchSettings or len(batch_settings) == 0 - ), "batch_settings must be of type BatchSettings or an empty tuple." + ), "batch_settings must be of type BatchSettings or an empty sequence." assert ( type(publisher_options) is types.PublisherOptions or len(publisher_options) == 0 - ), "publisher_options must be of type PublisherOptions or an empty tuple." + ), "publisher_options must be of type PublisherOptions or an empty sequence." # Sanity check: Is our goal to use the emulator? # If so, create a grpc insecure channel with the emulator host @@ -146,20 +161,25 @@ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): self._flow_controller = FlowController(self.publisher_options.flow_control) @classmethod - def from_service_account_file(cls, filename, batch_settings=(), **kwargs): + def from_service_account_file( + cls, + filename: str, + batch_settings: Union[types.BatchSettings, Sequence] = (), + **kwargs: Any, + ) -> "Client": """Creates an instance of this client using the provided credentials file. Args: - filename (str): The path to the service account private key json - file. - batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The - settings for batch publishing. - kwargs: Additional arguments to pass to the constructor. + filename: + The path to the service account private key JSON file. + batch_settings: + The settings for batch publishing. + kwargs: + Additional arguments to pass to the constructor. Returns: - A Publisher :class:`~google.cloud.pubsub_v1.publisher.client.Client` - instance that is the constructed client. + A Publisher instance that is the constructed client. """ credentials = service_account.Credentials.from_service_account_file(filename) kwargs["credentials"] = credentials @@ -168,15 +188,15 @@ def from_service_account_file(cls, filename, batch_settings=(), **kwargs): from_service_account_json = from_service_account_file @property - def target(self): + def target(self) -> str: """Return the target (where the API is). Returns: - str: The location of the API. + The location of the API. """ return self._target - def _get_or_create_sequencer(self, topic, ordering_key): + def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> "Sequencer": """ Get an existing sequencer or create a new one given the (topic, ordering_key) pair. """ @@ -193,11 +213,11 @@ def _get_or_create_sequencer(self, topic, ordering_key): return sequencer - def resume_publish(self, topic, ordering_key): + def resume_publish(self, topic: str, ordering_key: str) -> None: """ Resume publish on an ordering key that has had unrecoverable errors. Args: - topic (str): The topic to publish messages to. + topic: The topic to publish messages to. ordering_key: A string that identifies related messages for which publish order should be respected. @@ -231,13 +251,13 @@ def resume_publish(self, topic, ordering_key): def publish( self, - topic, - data, - ordering_key="", - retry=gapic_v1.method.DEFAULT, + topic: str, + data: bytes, + ordering_key: str = "", + retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT, timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - **attrs - ): + **attrs: Union[bytes, str], + ) -> "pubsub_v1.publisher.futures.Future": """Publish a single message. .. note:: @@ -266,22 +286,22 @@ def publish( >>> response = client.publish(topic, data, username='guido') Args: - topic (str): The topic to publish messages to. - data (bytes): A bytestring representing the message body. This + topic: The topic to publish messages to. + data: A bytestring representing the message body. This must be a bytestring. ordering_key: A string that identifies related messages for which publish order should be respected. Message ordering must be enabled for this client to use this feature. - retry (Optional[google.api_core.retry.Retry]): Designation of what - errors, if any, should be retried. If `ordering_key` is specified, - the total retry deadline will be changed to "infinity". + retry: + Designation of what errors, if any, should be retried. If `ordering_key` + is specified, the total retry deadline will be changed to "infinity". If given, it overides any retry passed into the client through the ``publisher_options`` argument. - timeout (:class:`~.pubsub_v1.types.TimeoutType`): + timeout: The timeout for the RPC request. Can be used to override any timeout passed in through ``publisher_options`` when instantiating the client. - attrs (Mapping[str, str]): A dictionary of attributes to be + attrs: A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) Returns: @@ -374,7 +394,7 @@ def on_publish_done(future): return future - def ensure_cleanup_and_commit_timer_runs(self): + def ensure_cleanup_and_commit_timer_runs(self) -> None: """ Ensure a cleanup/commit timer thread is running. If a cleanup/commit timer thread is already running, this does nothing. @@ -382,7 +402,7 @@ def ensure_cleanup_and_commit_timer_runs(self): with self._batch_lock: self._ensure_commit_timer_runs_no_lock() - def _ensure_commit_timer_runs_no_lock(self): + def _ensure_commit_timer_runs_no_lock(self) -> None: """ Ensure a commit timer thread is running, without taking _batch_lock. @@ -391,7 +411,7 @@ def _ensure_commit_timer_runs_no_lock(self): if not self._commit_thread and self.batch_settings.max_latency < float("inf"): self._start_commit_thread() - def _start_commit_thread(self): + def _start_commit_thread(self) -> None: """Start a new thread to actually wait and commit the sequencers.""" # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 @@ -403,7 +423,7 @@ def _start_commit_thread(self): ) self._commit_thread.start() - def _wait_and_commit_sequencers(self): + def _wait_and_commit_sequencers(self) -> None: """ Wait up to the batching timeout, and commit all sequencers. """ # Sleep for however long we should be waiting. @@ -416,7 +436,7 @@ def _wait_and_commit_sequencers(self): self._commit_sequencers() self._commit_thread = None - def _commit_sequencers(self): + def _commit_sequencers(self) -> None: """ Clean up finished sequencers and commit the rest. """ finished_sequencer_keys = [ key @@ -429,7 +449,7 @@ def _commit_sequencers(self): for sequencer in self._sequencers.values(): sequencer.commit() - def stop(self): + def stop(self) -> None: """Immediately publish all outstanding messages. Asynchronously sends all outstanding messages and @@ -458,15 +478,19 @@ def stop(self): sequencer.stop() # Used only for testing. - def _set_batch(self, topic, batch, ordering_key=""): + def _set_batch( + self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = "" + ) -> None: sequencer = self._get_or_create_sequencer(topic, ordering_key) sequencer._set_batch(batch) # Used only for testing. - def _set_batch_class(self, batch_class): + def _set_batch_class(self, batch_class: Type) -> None: self._batch_class = batch_class # Used only for testing. - def _set_sequencer(self, topic, sequencer, ordering_key=""): + def _set_sequencer( + self, topic: str, sequencer: "Sequencer", ordering_key: str = "" + ) -> None: sequencer_key = (topic, ordering_key) self._sequencers[sequencer_key] = sequencer diff --git a/google/cloud/pubsub_v1/publisher/exceptions.py b/google/cloud/pubsub_v1/publisher/exceptions.py index 89b3790a0..ff0f0713d 100644 --- a/google/cloud/pubsub_v1/publisher/exceptions.py +++ b/google/cloud/pubsub_v1/publisher/exceptions.py @@ -33,7 +33,7 @@ class PublishToPausedOrderingKeyException(Exception): occurred during publish of a batch for that key. """ - def __init__(self, ordering_key): + def __init__(self, ordering_key: str): self.ordering_key = ordering_key super(PublishToPausedOrderingKeyException, self).__init__() diff --git a/google/cloud/pubsub_v1/publisher/flow_controller.py b/google/cloud/pubsub_v1/publisher/flow_controller.py index f899f4d08..fa3d58d33 100644 --- a/google/cloud/pubsub_v1/publisher/flow_controller.py +++ b/google/cloud/pubsub_v1/publisher/flow_controller.py @@ -15,6 +15,7 @@ from collections import OrderedDict import logging import threading +from typing import Optional import warnings from google.cloud.pubsub_v1 import types @@ -45,11 +46,10 @@ class FlowController(object): """A class used to control the flow of messages passing through it. Args: - settings (~google.cloud.pubsub_v1.types.PublishFlowControl): - Desired flow control configuration. + settings: Desired flow control configuration. """ - def __init__(self, settings): + def __init__(self, settings: types.PublishFlowControl): self._settings = settings # Load statistics. They represent the number of messages added, but not @@ -72,14 +72,14 @@ def __init__(self, settings): # The condition for blocking the flow if capacity is exceeded. self._has_capacity = threading.Condition(lock=self._operational_lock) - def add(self, message): + def add(self, message: types.PubsubMessage) -> None: # pytype: disable=module-attr """Add a message to flow control. Adding a message updates the internal load statistics, and an action is taken if these limits are exceeded (depending on the flow control settings). Args: - message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + message: The message entering the flow control. Raises: @@ -166,11 +166,13 @@ def add(self, message): self._reserved_slots -= 1 del self._waiting[current_thread] - def release(self, message): + def release( + self, message: types.PubsubMessage # pytype: disable=module-attr + ) -> None: """Release a mesage from flow control. Args: - message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): + message: The message entering the flow control. """ if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE: @@ -197,7 +199,7 @@ def release(self, message): _LOGGER.debug("Notifying threads waiting to add messages to flow.") self._has_capacity.notify_all() - def _distribute_available_capacity(self): + def _distribute_available_capacity(self) -> None: """Distribute available capacity among the waiting threads in FIFO order. The method assumes that the caller has obtained ``_operational_lock``. @@ -237,13 +239,10 @@ def _distribute_available_capacity(self): self._reserved_bytes += can_give available_bytes -= can_give - def _ready_to_unblock(self): + def _ready_to_unblock(self) -> bool: """Determine if any of the threads waiting to add a message can proceed. The method assumes that the caller has obtained ``_operational_lock``. - - Returns: - bool """ if self._waiting: # It's enough to only check the head of the queue, because FIFO @@ -256,17 +255,15 @@ def _ready_to_unblock(self): return False - def _would_overflow(self, message): + def _would_overflow( + self, message: types.PubsubMessage # pytype: disable=module-attr + ) -> bool: """Determine if accepting a message would exceed flow control limits. The method assumes that the caller has obtained ``_operational_lock``. Args: - message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`): - The message entering the flow control. - - Returns: - bool + message: The message entering the flow control. """ reservation = self._waiting.get(threading.current_thread()) @@ -287,7 +284,9 @@ def _would_overflow(self, message): return size_overflow or msg_count_overflow - def _load_info(self, message_count=None, total_bytes=None): + def _load_info( + self, message_count: Optional[int] = None, total_bytes: Optional[int] = None + ) -> str: """Return the current flow control load information. The caller can optionally adjust some of the values to fit its reporting @@ -296,13 +295,10 @@ def _load_info(self, message_count=None, total_bytes=None): The method assumes that the caller has obtained ``_operational_lock``. Args: - message_count (Optional[int]): + message_count: The value to override the current message count with. - total_bytes (Optional[int]): + total_bytes: The value to override the current total bytes with. - - Returns: - str """ if message_count is None: message_count = self._message_count diff --git a/google/cloud/pubsub_v1/publisher/futures.py b/google/cloud/pubsub_v1/publisher/futures.py index 04748e854..09bb2417c 100644 --- a/google/cloud/pubsub_v1/publisher/futures.py +++ b/google/cloud/pubsub_v1/publisher/futures.py @@ -14,6 +14,8 @@ from __future__ import absolute_import +from typing import Union + from google.cloud.pubsub_v1 import futures @@ -25,32 +27,32 @@ class Future(futures.Future): ID, unless an error occurs. """ - def cancel(self): + def cancel(self) -> bool: """Actions in Pub/Sub generally may not be canceled. This method always returns ``False``. """ return False - def cancelled(self): + def cancelled(self) -> bool: """Actions in Pub/Sub generally may not be canceled. This method always returns ``False``. """ return False - def result(self, timeout=None): + def result(self, timeout: Union[int, float] = None) -> str: """Return the message ID or raise an exception. This blocks until the message has been published successfully and returns the message ID unless an exception is raised. Args: - timeout (Union[int, float]): The number of seconds before this call + timeout: The number of seconds before this call times out and raises TimeoutError. Returns: - str: The message ID. + The message ID. Raises: concurrent.futures.TimeoutError: If the request times out. diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 382c5c38a..badcd78f3 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -20,11 +20,28 @@ import logging import math import threading +import typing +from typing import Sequence, Union from google.cloud.pubsub_v1.subscriber._protocol import helper_threads from google.cloud.pubsub_v1.subscriber._protocol import requests from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + import queue + from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( + StreamingPullManager, + ) + + +RequestItem = Union[ + requests.AckRequest, + requests.DropRequest, + requests.LeaseRequest, + requests.ModAckRequest, + requests.NackRequest, +] + _LOGGER = logging.getLogger(__name__) _CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher" @@ -51,14 +68,15 @@ class Dispatcher(object): - def __init__(self, manager, queue): + def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"): self._manager = manager self._queue = queue self._thread = None self._operational_lock = threading.Lock() - def start(self): + def start(self) -> None: """Start a thread to dispatch requests queued up by callbacks. + Spawns a thread to run :meth:`dispatch_callback`. """ with self._operational_lock: @@ -78,7 +96,7 @@ def start(self): _LOGGER.debug("Started helper thread %s", thread.name) self._thread = thread - def stop(self): + def stop(self) -> None: with self._operational_lock: if self._thread is not None: # Signal the worker to stop by queueing a "poison pill" @@ -87,17 +105,12 @@ def stop(self): self._thread = None - def dispatch_callback(self, items): + def dispatch_callback(self, items: Sequence[RequestItem]) -> None: """Map the callback request to the appropriate gRPC request. Args: - action (str): The method to be invoked. - kwargs (Dict[str, Any]): The keyword arguments for the method - specified by ``action``. - - Raises: - ValueError: If ``action`` isn't one of the expected actions - "ack", "drop", "lease", "modify_ack_deadline" or "nack". + items: + Queued requests to dispatch. """ batched_commands = collections.defaultdict(list) @@ -119,11 +132,11 @@ def dispatch_callback(self, items): if batched_commands[requests.DropRequest]: self.drop(batched_commands.pop(requests.DropRequest)) - def ack(self, items): + def ack(self, items: Sequence[requests.AckRequest]) -> None: """Acknowledge the given messages. Args: - items(Sequence[AckRequest]): The items to acknowledge. + items: The items to acknowledge. """ # If we got timing information, add it to the histogram. for item in items: @@ -145,31 +158,36 @@ def ack(self, items): # Remove the message from lease management. self.drop(items) - def drop(self, items): + def drop( + self, + items: Sequence[ + Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] + ], + ) -> None: """Remove the given messages from lease management. Args: - items(Sequence[DropRequest]): The items to drop. + items: The items to drop. """ self._manager.leaser.remove(items) ordering_keys = (k.ordering_key for k in items if k.ordering_key) self._manager.activate_ordering_keys(ordering_keys) self._manager.maybe_resume_consumer() - def lease(self, items): + def lease(self, items: Sequence[requests.LeaseRequest]) -> None: """Add the given messages to lease management. Args: - items(Sequence[LeaseRequest]): The items to lease. + items: The items to lease. """ self._manager.leaser.add(items) self._manager.maybe_pause_consumer() - def modify_ack_deadline(self, items): + def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None: """Modify the ack deadline for the given messages. Args: - items(Sequence[ModAckRequest]): The items to modify. + items: The items to modify. """ # We must potentially split the request into multiple smaller requests # to avoid the server-side max request size limit. @@ -184,11 +202,11 @@ def modify_ack_deadline(self, items): ) self._manager.send(request) - def nack(self, items): + def nack(self, items: Sequence[requests.NackRequest]) -> None: """Explicitly deny receipt of messages. Args: - items(Sequence[NackRequest]): The items to deny. + items: The items to deny. """ self.modify_ack_deadline( [requests.ModAckRequest(ack_id=item.ack_id, seconds=0) for item in items] diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py b/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py index fef158965..842e4adc5 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/heartbeater.py @@ -16,6 +16,12 @@ import logging import threading +import typing + +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( + StreamingPullManager, + ) _LOGGER = logging.getLogger(__name__) @@ -27,14 +33,14 @@ class Heartbeater(object): - def __init__(self, manager, period=_DEFAULT_PERIOD): + def __init__(self, manager: "StreamingPullManager", period: int = _DEFAULT_PERIOD): self._thread = None self._operational_lock = threading.Lock() self._manager = manager self._stop_event = threading.Event() self._period = period - def heartbeat(self): + def heartbeat(self) -> None: """Periodically send streaming pull heartbeats. """ while not self._stop_event.is_set(): @@ -44,7 +50,7 @@ def heartbeat(self): _LOGGER.info("%s exiting.", _HEARTBEAT_WORKER_NAME) - def start(self): + def start(self) -> None: with self._operational_lock: if self._thread is not None: raise ValueError("Heartbeater is already running.") @@ -59,7 +65,7 @@ def start(self): _LOGGER.debug("Started helper thread %s", thread.name) self._thread = thread - def stop(self): + def stop(self) -> None: with self._operational_lock: self._stop_event.set() diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py b/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py index 661df7927..fbcab781d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py @@ -15,6 +15,7 @@ import logging import queue import time +from typing import Any, Callable, List, Sequence import uuid @@ -30,22 +31,25 @@ STOP = uuid.uuid4() -def _get_many(queue_, max_items=None, max_latency=0): +def _get_many( + queue_: queue.Queue, max_items: int = None, max_latency: float = 0 +) -> List[Any]: """Get multiple items from a Queue. Gets at least one (blocking) and at most ``max_items`` items (non-blocking) from a given Queue. Does not mark the items as done. Args: - queue_ (~queue.Queue`): The Queue to get items from. - max_items (int): The maximum number of items to get. If ``None``, then - all available items in the queue are returned. - max_latency (float): The maximum number of seconds to wait for more - than one item from a queue. This number includes the time required - to retrieve the first item. + queue_: The Queue to get items from. + max_items: + The maximum number of items to get. If ``None``, then all available items + in the queue are returned. + max_latency: + The maximum number of seconds to wait for more than one item from a queue. + This number includes the time required to retrieve the first item. Returns: - Sequence[Any]: A sequence of items retrieved from the queue. + A sequence of items retrieved from the queue. """ start = time.time() # Always return at least one item. @@ -67,26 +71,33 @@ class QueueCallbackWorker(object): :attr:`STOP`. Args: - queue (~queue.Queue): A Queue instance, appropriate for crossing the - concurrency boundary implemented by ``executor``. Items will - be popped off (with a blocking ``get()``) until :attr:`STOP` - is encountered. - callback (Callable[Sequence[Any], Any]): A callback that can process - items pulled off of the queue. Multiple items will be passed to - the callback in batches. - max_items (int): The maximum amount of items that will be passed to the - callback at a time. - max_latency (float): The maximum amount of time in seconds to wait for - additional items before executing the callback. + queue: + A Queue instance, appropriate for crossing the concurrency boundary + implemented by ``executor``. Items will be popped off (with a blocking + ``get()``) until :attr:`STOP` is encountered. + callback: + A callback that can process items pulled off of the queue. Multiple items + will be passed to the callback in batches. + max_items: + The maximum amount of items that will be passed to the callback at a time. + max_latency: + The maximum amount of time in seconds to wait for additional items before + executing the callback. """ - def __init__(self, queue, callback, max_items=100, max_latency=0): + def __init__( + self, + queue: queue.Queue, + callback: Callable[[Sequence[Any]], Any], + max_items: int = 100, + max_latency: float = 0, + ): self.queue = queue self._callback = callback self.max_items = max_items self.max_latency = max_latency - def __call__(self): + def __call__(self) -> None: continue_ = True while continue_: items = _get_many( diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py index 0a4a81746..7ffa4b3a0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import absolute_import, division +from typing import Dict, Optional, Union MIN_ACK_DEADLINE = 10 @@ -36,15 +36,15 @@ class Histogram(object): leases in the actual API. """ - def __init__(self, data=None): + def __init__(self, data: Optional[Dict[int, int]] = None): """Instantiate the histogram. Args: - data (Mapping[str, int]): The data strucure to be used to store - the underlying data. The default is an empty dictionary. - This can be set to a dictionary-like object if required - (for example, if a special object is needed for - concurrency reasons). + data: + The data strucure to be used to store the underlying data. The default + is an empty dictionary. This can be set to a dictionary-like object if + required (for example, if a special object is needed for concurrency + reasons). """ # The data is stored as a dictionary, with the keys being the # value being added and the values being the number of times that @@ -60,22 +60,19 @@ def __init__(self, data=None): self._data = data self._len = 0 - def __len__(self): + def __len__(self) -> int: """Return the total number of data points in this histogram. This is cached on a separate counter (rather than computing it using ``sum([v for v in self._data.values()])``) to optimize lookup. Returns: - int: The total number of data points in this histogram. + The total number of data points in this histogram. """ return self._len - def __contains__(self, needle): - """Return True if needle is present in the histogram, False otherwise. - - Returns: - bool: True or False + def __contains__(self, needle: int) -> bool: + """Return ``True`` if needle is present in the histogram, ``False`` otherwise. """ return needle in self._data @@ -85,37 +82,38 @@ def __repr__(self): ) @property - def max(self): + def max(self) -> int: """Return the maximum value in this histogram. If there are no values in the histogram at all, return ``MAX_ACK_DEADLINE``. Returns: - int: The maximum value in the histogram. + The maximum value in the histogram. """ if len(self._data) == 0: return MAX_ACK_DEADLINE return next(iter(reversed(sorted(self._data.keys())))) @property - def min(self): + def min(self) -> int: """Return the minimum value in this histogram. If there are no values in the histogram at all, return ``MIN_ACK_DEADLINE``. Returns: - int: The minimum value in the histogram. + The minimum value in the histogram. """ if len(self._data) == 0: return MIN_ACK_DEADLINE return next(iter(sorted(self._data.keys()))) - def add(self, value): + def add(self, value: Union[int, float]) -> None: """Add the value to this histogram. Args: - value (int): The value. Values outside of - ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` + value: + The value. Values outside of + ``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE`` will be raised to ``MIN_ACK_DEADLINE`` or reduced to ``MAX_ACK_DEADLINE``. """ @@ -131,15 +129,16 @@ def add(self, value): self._data[value] += 1 self._len += 1 - def percentile(self, percent): + def percentile(self, percent: Union[int, float]) -> int: """Return the value that is the Nth precentile in the histogram. Args: - percent (Union[int, float]): The precentile being sought. The - default consumer implementations consistently use ``99``. + percent: + The precentile being sought. The default consumer implementations + consistently use ``99``. Returns: - int: The value corresponding to the requested percentile. + The value corresponding to the requested percentile. """ # Sanity check: Any value over 100 should become 100. if percent >= 100: diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index 8fd067aaf..c8d7e9365 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -20,9 +20,16 @@ import random import threading import time +import typing +from typing import Iterable, Sequence, Union from google.cloud.pubsub_v1.subscriber._protocol import requests +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( + StreamingPullManager, + ) + _LOGGER = logging.getLogger(__name__) _LEASE_WORKER_NAME = "Thread-LeaseMaintainer" @@ -34,7 +41,7 @@ class Leaser(object): - def __init__(self, manager): + def __init__(self, manager: "StreamingPullManager"): self._thread = None self._manager = manager @@ -55,21 +62,21 @@ def __init__(self, manager): self._stop_event = threading.Event() @property - def message_count(self): - """int: The number of leased messages.""" + def message_count(self) -> int: + """The number of leased messages.""" return len(self._leased_messages) @property - def ack_ids(self): - """Sequence[str]: The ack IDs of all leased messages.""" + def ack_ids(self) -> Sequence[str]: + """The ack IDs of all leased messages.""" return self._leased_messages.keys() @property - def bytes(self): - """int: The total size, in bytes, of all leased messages.""" + def bytes(self) -> int: + """The total size, in bytes, of all leased messages.""" return self._bytes - def add(self, items): + def add(self, items: Iterable[requests.LeaseRequest]) -> None: """Add messages to be managed by the leaser.""" with self._add_remove_lock: for item in items: @@ -85,12 +92,11 @@ def add(self, items): else: _LOGGER.debug("Message %s is already lease managed", item.ack_id) - def start_lease_expiry_timer(self, ack_ids): + def start_lease_expiry_timer(self, ack_ids: Iterable[str]) -> None: """Start the lease expiry timer for `items`. Args: - items (Sequence[str]): Sequence of ack-ids for which to start - lease expiry timers. + items: Sequence of ack-ids for which to start lease expiry timers. """ with self._add_remove_lock: for ack_id in ack_ids: @@ -102,7 +108,12 @@ def start_lease_expiry_timer(self, ack_ids): sent_time=time.time() ) - def remove(self, items): + def remove( + self, + items: Iterable[ + Union[requests.AckRequest, requests.DropRequest, requests.NackRequest] + ], + ) -> None: """Remove messages from lease management.""" with self._add_remove_lock: # Remove the ack ID from lease management, and decrement the @@ -117,7 +128,7 @@ def remove(self, items): _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) self._bytes = 0 - def maintain_leases(self): + def maintain_leases(self) -> None: """Maintain all of the leases being managed. This method modifies the ack deadline for all of the managed @@ -188,7 +199,7 @@ def maintain_leases(self): _LOGGER.info("%s exiting.", _LEASE_WORKER_NAME) - def start(self): + def start(self) -> None: with self._operational_lock: if self._thread is not None: raise ValueError("Leaser is already running.") @@ -203,7 +214,7 @@ def start(self): _LOGGER.debug("Started helper thread %s", thread.name) self._thread = thread - def stop(self): + def stop(self) -> None: with self._operational_lock: self._stop_event.set() diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py index bab15f218..82d5ca376 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py @@ -13,6 +13,11 @@ # limitations under the License. import collections +import typing +from typing import Any, Callable, Iterable, Optional + +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1 import subscriber class MessagesOnHold(object): @@ -41,27 +46,25 @@ def __init__(self): self._pending_ordered_messages = {} @property - def size(self): - """Return the number of messages on hold across ordered and unordered - messages. + def size(self) -> int: + """Return the number of messages on hold across ordered and unordered messages. Note that this object may still store information about ordered messages in flight even if size is zero. Returns: - int: The size value. + The size value. """ return self._size - def get(self): + def get(self) -> Optional["subscriber.message.Message"]: """ Gets a message from the on-hold queue. A message with an ordering key wont be returned if there's another message with the same key in flight. Returns: - Optional[google.cloud.pubsub_v1.subscriber.message.Message]: A message - that hasn't been sent to the user yet or None if there are no - messages available. + A message that hasn't been sent to the user yet or ``None`` if there are no + messages available. """ while self._messages_on_hold: msg = self._messages_on_hold.popleft() @@ -88,17 +91,20 @@ def get(self): return None - def put(self, message): + def put(self, message: "subscriber.message.Message") -> None: """Put a message on hold. Args: - message (google.cloud.pubsub_v1.subscriber.message.Message): The - message to put on hold. + message: The message to put on hold. """ self._messages_on_hold.append(message) self._size = self._size + 1 - def activate_ordering_keys(self, ordering_keys, schedule_message_callback): + def activate_ordering_keys( + self, + ordering_keys: Iterable[str], + schedule_message_callback: Callable[["subscriber.message.Message"], Any], + ) -> None: """Send the next message in the queue for each of the passed-in ordering keys, if they exist. Clean up state for keys that no longer have any queued messages. @@ -107,9 +113,9 @@ def activate_ordering_keys(self, ordering_keys, schedule_message_callback): detail about the impact of this method on load. Args: - ordering_keys(Sequence[str]): A sequence of ordering keys to - activate. May be empty. - schedule_message_callback(Callable[google.cloud.pubsub_v1.subscriber.message.Message]): + ordering_keys: + The ordering keys to activate. May be empty. + schedule_message_callback: The callback to call to schedule a message to be sent to the user. """ for key in ordering_keys: @@ -126,18 +132,19 @@ def activate_ordering_keys(self, ordering_keys, schedule_message_callback): # No more messages for this ordering key, so do clean-up. self._clean_up_ordering_key(key) - def _get_next_for_ordering_key(self, ordering_key): + def _get_next_for_ordering_key( + self, ordering_key: str + ) -> Optional["subscriber.message.Message"]: """Get next message for ordering key. The client should call clean_up_ordering_key() if this method returns None. Args: - ordering_key (str): Ordering key for which to get the next message. + ordering_key: Ordering key for which to get the next message. Returns: - google.cloud.pubsub_v1.subscriber.message.Message|None: The - next message for this ordering key or None if there aren't any. + The next message for this ordering key or None if there aren't any. """ queue_for_key = self._pending_ordered_messages.get(ordering_key) if queue_for_key: @@ -145,11 +152,11 @@ def _get_next_for_ordering_key(self, ordering_key): return queue_for_key.popleft() return None - def _clean_up_ordering_key(self, ordering_key): + def _clean_up_ordering_key(self, ordering_key: str) -> None: """Clean up state for an ordering key with no pending messages. Args: - ordering_key (str): The ordering key to clean up. + ordering_key: The ordering key to clean up. """ message_queue = self._pending_ordered_messages.get(ordering_key) assert ( diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py index 58d53a61d..7481d95a9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/requests.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/requests.py @@ -12,28 +12,36 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Base class for concurrency policy.""" +from typing import NamedTuple, Optional -from __future__ import absolute_import, division - -import collections # Namedtuples for management requests. Used by the Message class to communicate # items of work back to the policy. -AckRequest = collections.namedtuple( - "AckRequest", ["ack_id", "byte_size", "time_to_ack", "ordering_key"] -) +class AckRequest(NamedTuple): + ack_id: str + byte_size: int + time_to_ack: float + ordering_key: Optional[str] + + +class DropRequest(NamedTuple): + ack_id: str + byte_size: int + ordering_key: Optional[str] + + +class LeaseRequest(NamedTuple): + ack_id: str + byte_size: int + ordering_key: Optional[str] -DropRequest = collections.namedtuple( - "DropRequest", ["ack_id", "byte_size", "ordering_key"] -) -LeaseRequest = collections.namedtuple( - "LeaseRequest", ["ack_id", "byte_size", "ordering_key"] -) +class ModAckRequest(NamedTuple): + ack_id: str + seconds: float -ModAckRequest = collections.namedtuple("ModAckRequest", ["ack_id", "seconds"]) -NackRequest = collections.namedtuple( - "NackRequest", ["ack_id", "byte_size", "ordering_key"] -) +class NackRequest(NamedTuple): + ack_id: str + byte_size: int + ordering_key: Optional[str] diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index da027fcbe..018917b90 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -19,6 +19,8 @@ import itertools import logging import threading +import typing +from typing import Any, Callable, Iterable import uuid import grpc @@ -36,6 +38,11 @@ import google.cloud.pubsub_v1.subscriber.scheduler from google.pubsub_v1 import types as gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1 import subscriber + from google.cloud.pubsub_v1.subscriber.scheduler import Scheduler + + _LOGGER = logging.getLogger(__name__) _REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" @@ -55,11 +62,11 @@ """The load threshold below which to resume the incoming message stream.""" -def _wrap_as_exception(maybe_exception): +def _wrap_as_exception(maybe_exception: Any) -> BaseException: """Wrap an object as a Python exception, if needed. Args: - maybe_exception (Any): The object to wrap, usually a gRPC exception class. + maybe_exception: The object to wrap, usually a gRPC exception class. Returns: The argument itself if an instance of ``BaseException``, otherwise @@ -73,13 +80,17 @@ def _wrap_as_exception(maybe_exception): return Exception(maybe_exception) -def _wrap_callback_errors(callback, on_callback_error, message): +def _wrap_callback_errors( + callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], + on_callback_error: Callable[[Exception], Any], + message: "google.cloud.pubsub_v1.subscriber.message.Message", +): """Wraps a user callback so that if an exception occurs the message is nacked. Args: - callback (Callable[None, Message]): The user callback. - message (~Message): The Pub/Sub message. + callback: The user callback. + message: The Pub/Sub message. """ try: callback(message) @@ -99,21 +110,21 @@ class StreamingPullManager(object): leasing them, and scheduling them to be processed. Args: - client (~.pubsub_v1.subscriber.client): The subscriber client used - to create this instance. - subscription (str): The name of the subscription. The canonical - format for this is + client: + The subscriber client used to create this instance. + subscription: + The name of the subscription. The canonical format for this is ``projects/{project}/subscriptions/{subscription}``. - flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow - control settings. - use_legacy_flow_control (bool): + flow_control: + The flow control settings. + scheduler: + The scheduler to use to process messages. If not provided, a thread + pool-based scheduler will be used. + use_legacy_flow_control: If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, though client-side flow control is still enabled. If set to ``False`` (default), both server-side and client-side flow control are enabled. - scheduler (~google.cloud.pubsub_v1.scheduler.Scheduler): The scheduler - to use to process messages. If not provided, a thread pool-based - scheduler will be used. - await_callbacks_on_shutdown (bool): + await_callbacks_on_shutdown: If ``True``, the shutdown thread will wait until all scheduler threads terminate and only then proceed with shutting down the remaining running helper threads. @@ -129,12 +140,12 @@ class StreamingPullManager(object): def __init__( self, - client, - subscription, - flow_control=types.FlowControl(), - scheduler=None, - use_legacy_flow_control=False, - await_callbacks_on_shutdown=False, + client: "subscriber.Client", + subscription: str, + flow_control: types.FlowControl = types.FlowControl(), + scheduler: "Scheduler" = None, + use_legacy_flow_control: bool = False, + await_callbacks_on_shutdown: bool = False, ): self._client = client self._subscription = subscription @@ -191,8 +202,8 @@ def __init__( self._heartbeater = None @property - def is_active(self): - """bool: True if this manager is actively streaming. + def is_active(self) -> bool: + """``True`` if this manager is actively streaming. Note that ``False`` does not indicate this is complete shut down, just that it stopped getting new messages. @@ -200,42 +211,35 @@ def is_active(self): return self._consumer is not None and self._consumer.is_active @property - def flow_control(self): - """google.cloud.pubsub_v1.types.FlowControl: The active flow control - settings.""" + def flow_control(self) -> types.FlowControl: + """The active flow control settings.""" return self._flow_control @property - def dispatcher(self): - """google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher: - The dispatcher helper. - """ + def dispatcher(self) -> dispatcher.Dispatcher: + """The dispatcher helper.""" return self._dispatcher @property - def leaser(self): - """google.cloud.pubsub_v1.subscriber._protocol.leaser.Leaser: - The leaser helper. - """ + def leaser(self) -> leaser.Leaser: + """The leaser helper.""" return self._leaser @property - def ack_histogram(self): - """google.cloud.pubsub_v1.subscriber._protocol.histogram.Histogram: - The histogram tracking time-to-acknowledge. - """ + def ack_histogram(self) -> histogram.Histogram: + """The histogram tracking time-to-acknowledge.""" return self._ack_histogram @property - def ack_deadline(self): + def ack_deadline(self) -> float: """Return the current ACK deadline based on historical data without updating it. Returns: - int: The ack deadline. + The ack deadline. """ return self._obtain_ack_deadline(maybe_update=False) - def _obtain_ack_deadline(self, maybe_update: bool) -> int: + def _obtain_ack_deadline(self, maybe_update: bool) -> float: """The actual `ack_deadline` implementation. This method is "sticky". It will only perform the computations to check on the @@ -243,12 +247,12 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> int: time-to-ack data has gained a significant amount of new information. Args: - maybe_update (bool): + maybe_update: If ``True``, also update the current ACK deadline before returning it if enough new ACK data has been gathered. Returns: - int: The current ACK deadline in seconds to use. + The current ACK deadline in seconds to use. """ with self._ack_deadline_lock: if not maybe_update: @@ -273,7 +277,7 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> int: return self._ack_deadline @property - def load(self): + def load(self) -> float: """Return the current load. The load is represented as a float, where 1.0 represents having @@ -287,7 +291,7 @@ def load(self): running room on setting A if setting B is over.) Returns: - float: The load value. + The load value. """ if self._leaser is None: return 0.0 @@ -307,15 +311,17 @@ def load(self): ] ) - def add_close_callback(self, callback): + def add_close_callback( + self, callback: Callable[["StreamingPullManager", Any], Any], + ) -> None: """Schedules a callable when the manager closes. Args: - callback (Callable): The method to call. + The method to call. """ self._close_callbacks.append(callback) - def activate_ordering_keys(self, ordering_keys): + def activate_ordering_keys(self, ordering_keys: Iterable[str]) -> None: """Send the next message in the queue for each of the passed-in ordering keys, if they exist. Clean up state for keys that no longer have any queued messages. @@ -326,8 +332,8 @@ def activate_ordering_keys(self, ordering_keys): This decision is by design because it simplifies MessagesOnHold. Args: - ordering_keys(Sequence[str]): A sequence of ordering keys to - activate. May be empty. + ordering_keys: + A sequence of ordering keys to activate. May be empty. """ with self._pause_resume_lock: if self._scheduler is None: @@ -337,7 +343,7 @@ def activate_ordering_keys(self, ordering_keys): ordering_keys, self._schedule_message_on_hold ) - def maybe_pause_consumer(self): + def maybe_pause_consumer(self) -> None: """Check the current load and pause the consumer if needed.""" with self._pause_resume_lock: if self.load >= _MAX_LOAD: @@ -347,7 +353,7 @@ def maybe_pause_consumer(self): ) self._consumer.pause() - def maybe_resume_consumer(self): + def maybe_resume_consumer(self) -> None: """Check the load and held messages and resume the consumer if needed. If there are messages held internally, release those messages before @@ -375,7 +381,7 @@ def maybe_resume_consumer(self): else: _LOGGER.debug("Did not resume, current load is %.2f.", self.load) - def _maybe_release_messages(self): + def _maybe_release_messages(self) -> None: """Release (some of) the held messages if the current load allows for it. The method tries to release as many messages as the current leaser load @@ -397,15 +403,15 @@ def _maybe_release_messages(self): released_ack_ids.append(msg.ack_id) self._leaser.start_lease_expiry_timer(released_ack_ids) - def _schedule_message_on_hold(self, msg): - """Schedule a message on hold to be sent to the user and change - on-hold-bytes. + def _schedule_message_on_hold( + self, msg: "google.cloud.pubsub_v1.subscriber.message.Message" + ): + """Schedule a message on hold to be sent to the user and change on-hold-bytes. The method assumes the caller has acquired the ``_pause_resume_lock``. Args: - msg (google.cloud.pubsub_v1.message.Message): The message to - schedule to be sent to the user. + msg: The message to schedule to be sent to the user. """ assert msg, "Message must not be None." @@ -426,13 +432,11 @@ def _schedule_message_on_hold(self, msg): ) self._scheduler.schedule(self._callback, msg) - def _send_unary_request(self, request): - """Send a request using a separate unary request instead of over the - stream. + def _send_unary_request(self, request: gapic_types.StreamingPullRequest) -> None: + """Send a request using a separate unary request instead of over the stream. Args: - request (gapic_types.StreamingPullRequest): The stream request to be - mapped into unary requests. + request: The stream request to be mapped into unary requests. """ if request.ack_ids: self._client.acknowledge( @@ -456,7 +460,7 @@ def _send_unary_request(self, request): _LOGGER.debug("Sent request(s) over unary RPC.") - def send(self, request): + def send(self, request: gapic_types.StreamingPullRequest) -> None: """Queue a request to be sent to the RPC. If a RetryError occurs, the manager shutdown is triggered, and the @@ -481,11 +485,11 @@ def send(self, request): self._on_rpc_done(exc) raise - def heartbeat(self): + def heartbeat(self) -> bool: """Sends an empty request over the streaming pull RPC. Returns: - bool: If a heartbeat request has actually been sent. + If a heartbeat request has actually been sent. """ if self._rpc is not None and self._rpc.is_active: self._rpc.send(gapic_types.StreamingPullRequest()) @@ -493,14 +497,18 @@ def heartbeat(self): return False - def open(self, callback, on_callback_error): + def open( + self, + callback: Callable[["google.cloud.pubsub_v1.subscriber.message.Message"], Any], + on_callback_error: Callable[[Exception], Any], + ) -> None: """Begin consuming messages. Args: - callback (Callable[None, google.cloud.pubsub_v1.message.Message]): + callback: A callback that will be called for each message received on the stream. - on_callback_error (Callable[Exception]): + on_callback_error: A callable that will be called if an exception is raised in the provided `callback`. """ @@ -536,10 +544,13 @@ def open(self, callback, on_callback_error): ) # Create references to threads + # pytype: disable=wrong-arg-types + # (pytype incorrectly complains about "self" not being the right argument type) self._dispatcher = dispatcher.Dispatcher(self, self._scheduler.queue) self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) self._leaser = leaser.Leaser(self) self._heartbeater = heartbeater.Heartbeater(self) + # pytype: enable=wrong-arg-types # Start the thread to pass the requests. self._dispatcher.start() @@ -553,7 +564,7 @@ def open(self, callback, on_callback_error): # Start the stream heartbeater thread. self._heartbeater.start() - def close(self, reason=None): + def close(self, reason: Any = None) -> None: """Stop consuming messages and shutdown all helper threads. This method is idempotent. Additional calls will have no effect. @@ -562,7 +573,8 @@ def close(self, reason=None): thread. Args: - reason (Any): The reason to close this. If ``None``, this is considered + reason: + The reason to close this. If ``None``, this is considered an "intentional" shutdown. This is passed to the callbacks specified via :meth:`add_close_callback`. """ @@ -574,12 +586,13 @@ def close(self, reason=None): ) self._regular_shutdown_thread.start() - def _shutdown(self, reason=None): + def _shutdown(self, reason: Any = None) -> None: """Run the actual shutdown sequence (stop the stream and all helper threads). Args: - reason (Any): The reason to close the stream. If ``None``, this is - considered an "intentional" shutdown. + reason: + The reason to close the stream. If ``None``, this is considered + an "intentional" shutdown. """ with self._closing: if self._closed: @@ -637,19 +650,20 @@ def _shutdown(self, reason=None): for callback in self._close_callbacks: callback(self, reason) - def _get_initial_request(self, stream_ack_deadline_seconds): + def _get_initial_request( + self, stream_ack_deadline_seconds: int + ) -> gapic_types.StreamingPullRequest: """Return the initial request for the RPC. This defines the initial request that must always be sent to Pub/Sub immediately upon opening the subscription. Args: - stream_ack_deadline_seconds (int): + stream_ack_deadline_seconds: The default message acknowledge deadline for the stream. Returns: - google.pubsub_v1.types.StreamingPullRequest: A request - suitable for being the first request on the stream (and not + A request suitable for being the first request on the stream (and not suitable for any other purpose). """ # Any ack IDs that are under lease management need to have their @@ -679,7 +693,7 @@ def _get_initial_request(self, stream_ack_deadline_seconds): # Return the initial request. return request - def _on_response(self, response): + def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: """Process all received Pub/Sub messages. For each message, send a modified acknowledgment request to the @@ -739,7 +753,7 @@ def _on_response(self, response): self.maybe_pause_consumer() - def _should_recover(self, exception): + def _should_recover(self, exception: Exception) -> bool: """Determine if an error on the RPC stream should be recovered. If the exception is one of the retryable exceptions, this will signal @@ -748,7 +762,7 @@ def _should_recover(self, exception): This will cause the stream to exit when it returns :data:`False`. Returns: - bool: Indicates if the caller should recover or shut down. + Indicates if the caller should recover or shut down. Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of retryable / idempotent exceptions. """ @@ -761,7 +775,7 @@ def _should_recover(self, exception): _LOGGER.info("Observed non-recoverable stream error %s", exception) return False - def _should_terminate(self, exception): + def _should_terminate(self, exception: Exception) -> bool: """Determine if an error on the RPC stream should be terminated. If the exception is one of the terminating exceptions, this will signal @@ -770,7 +784,7 @@ def _should_terminate(self, exception): This will cause the stream to exit when it returns :data:`True`. Returns: - bool: Indicates if the caller should terminate or attempt recovery. + Indicates if the caller should terminate or attempt recovery. Will be :data:`True` if the ``exception`` is "acceptable", i.e. in a list of terminating exceptions. """ @@ -781,7 +795,7 @@ def _should_terminate(self, exception): _LOGGER.info("Observed non-terminating stream error %s", exception) return False - def _on_rpc_done(self, future): + def _on_rpc_done(self, future: Any) -> None: """Triggered whenever the underlying RPC terminates without recovery. This is typically triggered from one of two threads: the background diff --git a/google/cloud/pubsub_v1/subscriber/client.py b/google/cloud/pubsub_v1/subscriber/client.py index c4b229a17..099a65318 100644 --- a/google/cloud/pubsub_v1/subscriber/client.py +++ b/google/cloud/pubsub_v1/subscriber/client.py @@ -16,6 +16,8 @@ import os import pkg_resources +import typing +from typing import Any, Callable, Optional, Sequence, Union from google.auth.credentials import AnonymousCredentials from google.oauth2 import service_account @@ -26,6 +28,9 @@ from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager from google.pubsub_v1.services.subscriber import client as subscriber_client +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1 import subscriber + try: __version__ = pkg_resources.get_distribution("google-cloud-pubsub").version @@ -50,7 +55,7 @@ class Client(object): get sensible defaults. Args: - kwargs (dict): Any additional arguments provided are sent as keyword + kwargs: Any additional arguments provided are sent as keyword keyword arguments to the underlying :class:`~google.cloud.pubsub_v1.gapic.subscriber_client.SubscriberClient`. Generally you should not need to set additional keyword @@ -72,7 +77,7 @@ class Client(object): ) """ - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any): # Sanity check: Is our goal to use the emulator? # If so, create a grpc insecure channel with the emulator host # as the target. @@ -88,13 +93,12 @@ def __init__(self, **kwargs): self._closed = False @classmethod - def from_service_account_file(cls, filename, **kwargs): + def from_service_account_file(cls, filename: str, **kwargs: Any) -> "Client": """Creates an instance of this client using the provided credentials file. Args: - filename (str): The path to the service account private key json - file. + filename: The path to the service account private key json file. kwargs: Additional arguments to pass to the constructor. Returns: @@ -108,16 +112,16 @@ def from_service_account_file(cls, filename, **kwargs): from_service_account_json = from_service_account_file @property - def target(self): + def target(self) -> str: """Return the target (where the API is). Returns: - str: The location of the API. + The location of the API. """ return self._target @property - def api(self): + def api(self) -> subscriber_client.SubscriberClient: """The underlying gapic API client.""" return self._api @@ -131,13 +135,13 @@ def closed(self) -> bool: def subscribe( self, - subscription, - callback, - flow_control=(), - scheduler=None, - use_legacy_flow_control=False, - await_callbacks_on_shutdown=False, - ): + subscription: str, + callback: Callable[["subscriber.message.Message"], Any], + flow_control: Union[types.FlowControl, Sequence] = (), + scheduler: Optional["subscriber.scheduler.Scheduler"] = None, + use_legacy_flow_control: bool = False, + await_callbacks_on_shutdown: bool = False, + ) -> futures.StreamingPullFuture: """Asynchronously start receiving messages on a given subscription. This method starts a background thread to begin pulling messages from @@ -201,25 +205,25 @@ def callback(message): future.result() # Block until the shutdown is complete. Args: - subscription (str): The name of the subscription. The - subscription should have already been created (for example, - by using :meth:`create_subscription`). - callback (Callable[~google.cloud.pubsub_v1.subscriber.message.Message]): + subscription: + The name of the subscription. The subscription should have already been + created (for example, by using :meth:`create_subscription`). + callback: The callback function. This function receives the message as its only argument and will be called from a different thread/ process depending on the scheduling strategy. - flow_control (~google.cloud.pubsub_v1.types.FlowControl): The flow control - settings. Use this to prevent situations where you are + flow_control: + The flow control settings. Use this to prevent situations where you are inundated with too many messages at once. - scheduler (~google.cloud.pubsub_v1.subscriber.scheduler.Scheduler): An optional - *scheduler* to use when executing the callback. This controls - how callbacks are executed concurrently. This object must not be shared - across multiple SubscriberClients. + scheduler: + An optional *scheduler* to use when executing the callback. This + controls how callbacks are executed concurrently. This object must not + be shared across multiple ``SubscriberClient`` instances. use_legacy_flow_control (bool): If set to ``True``, flow control at the Cloud Pub/Sub server is disabled, though client-side flow control is still enabled. If set to ``False`` (default), both server-side and client-side flow control are enabled. - await_callbacks_on_shutdown (bool): + await_callbacks_on_shutdown: If ``True``, after canceling the returned future, the latter's ``result()`` method will block until the background stream and its helper threads have been terminated, and all currently executing message @@ -232,8 +236,7 @@ def callback(message): running at that point. Returns: - A :class:`~google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture` - instance that can be used to manage the background stream. + A future instance that can be used to manage the background stream. """ flow_control = types.FlowControl(*flow_control) @@ -252,7 +255,7 @@ def callback(message): return future - def close(self): + def close(self) -> None: """Close the underlying channel to release socket resources. After a channel has been closed, the client instance cannot be used @@ -263,7 +266,7 @@ def close(self): self.api._transport.grpc_channel.close() self._closed = True - def __enter__(self): + def __enter__(self) -> "Client": if self._closed: raise RuntimeError("Closed subscriber cannot be used as context manager.") return self diff --git a/google/cloud/pubsub_v1/subscriber/futures.py b/google/cloud/pubsub_v1/subscriber/futures.py index 18298b956..eec9590ed 100644 --- a/google/cloud/pubsub_v1/subscriber/futures.py +++ b/google/cloud/pubsub_v1/subscriber/futures.py @@ -14,9 +14,18 @@ from __future__ import absolute_import +import typing +from typing import Any + from google.cloud.pubsub_v1 import futures +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager import ( + StreamingPullManager, + ) + + class StreamingPullFuture(futures.Future): """Represents a process that asynchronously performs streaming pull and schedules messages to be processed. @@ -26,13 +35,13 @@ class StreamingPullFuture(futures.Future): the calling thread to block indefinitely. """ - def __init__(self, manager): + def __init__(self, manager: "StreamingPullManager"): super(StreamingPullFuture, self).__init__() self.__manager = manager self.__manager.add_close_callback(self._on_close_callback) self.__cancelled = False - def _on_close_callback(self, manager, result): + def _on_close_callback(self, manager: "StreamingPullManager", result: Any): if self.done(): # The future has already been resolved in a different thread, # nothing to do on the streaming pull manager shutdown. @@ -57,9 +66,9 @@ def cancel(self): self.__cancelled = True return self.__manager.close() - def cancelled(self): + def cancelled(self) -> bool: """ - returns: - bool: ``True`` if the subscription has been cancelled. + Returns: + ``True`` if the subscription has been cancelled. """ return self.__cancelled diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 02ffd9984..4a08257a6 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -18,9 +18,17 @@ import json import math import time +import typing +from typing import Optional from google.cloud.pubsub_v1.subscriber._protocol import requests +if typing.TYPE_CHECKING: # pragma: NO COVER + import datetime + import queue + from google.cloud.pubsub_v1 import types + from google.protobuf.internal import containers + _MESSAGE_REPR = """\ Message {{ @@ -30,18 +38,19 @@ }}""" -def _indent(lines, prefix=" "): +def _indent(lines: str, prefix: str = " ") -> str: """Indent some text. Note that this is present as ``textwrap.indent``, but not in Python 2. Args: - lines (str): The newline delimited string to be indented. - prefix (Optional[str]): The prefix to indent each line with. Default - to two spaces. + lines: + The newline delimited string to be indented. + prefix: + The prefix to indent each line with. Defaults to two spaces. Returns: - str: The newly indented content. + The newly indented content. """ indented = [] for line in lines.split("\n"): @@ -60,17 +69,25 @@ class Message(object): :class:`~.pubsub_v1.subscriber._consumer.Consumer`.) Attributes: - message_id (str): The message ID. In general, you should not need - to use this directly. - data (bytes): The data in the message. Note that this will be a - :class:`bytes`, not a text string. - attributes (.ScalarMapContainer): The attributes sent along with the - message. See :attr:`attributes` for more information on this type. - publish_time (datetime): The time that this message was originally - published. + message_id: + The message ID. In general, you should not need to use this directly. + data: + The data in the message. Note that this will be a :class:`bytes`, + not a text string. + attributes: + The attributes sent along with the message. See :attr:`attributes` for more + information on this type. + publish_time: + The time that this message was originally published. """ - def __init__(self, message, ack_id, delivery_attempt, request_queue): + def __init__( # pytype: disable=module-attr + self, + message: "types.PubsubMessage._meta._pb", + ack_id: str, + delivery_attempt: int, + request_queue: "queue.Queue", + ): """Construct the Message. .. note:: @@ -79,19 +96,20 @@ def __init__(self, message, ack_id, delivery_attempt, request_queue): responsibility of :class:`BasePolicy` subclasses to do so. Args: - message (`pubsub_v1.types.PubsubMessage._meta._pb`): + message: The message received from Pub/Sub. For performance reasons it should be the raw protobuf message normally wrapped by :class:`~pubsub_v1.types.PubsubMessage`. A raw message can be obtained from a :class:`~pubsub_v1.types.PubsubMessage` instance through the latter's ``._pb`` attribute. - ack_id (str): The ack_id received from Pub/Sub. - delivery_attempt (int): The delivery attempt counter received - from Pub/Sub if a DeadLetterPolicy is set on the subscription, - and zero otherwise. - request_queue (queue.Queue): A queue provided by the policy that - can accept requests; the policy is responsible for handling - those requests. + ack_id: + The ack_id received from Pub/Sub. + delivery_attempt: + The delivery attempt counter received from Pub/Sub if a DeadLetterPolicy + is set on the subscription, and zero otherwise. + request_queue: + A queue provided by the policy that can accept requests; the policy is + responsible for handling those requests. """ self._message = message self._ack_id = ack_id @@ -131,12 +149,12 @@ def __repr__(self): return _MESSAGE_REPR.format(abbv_data, str(self.ordering_key), pretty_attrs) @property - def attributes(self): + def attributes(self) -> "containers.ScalarMap": """Return the attributes of the underlying Pub/Sub Message. .. warning:: - A ``ScalarMapContainer`` behaves slightly differently than a + A ``ScalarMap`` behaves slightly differently than a ``dict``. For a Pub / Sub message this is a ``string->string`` map. When trying to access a value via ``map['key']``, if the key is not in the map, then the default value for the string type will @@ -144,47 +162,47 @@ def attributes(self): to just cast the map to a ``dict`` or to one use ``map.get``. Returns: - .ScalarMapContainer: The message's attributes. This is a - ``dict``-like object provided by ``google.protobuf``. + The message's attributes. This is a ``dict``-like object provided by + ``google.protobuf``. """ return self._attributes @property - def data(self): + def data(self) -> bytes: """Return the data for the underlying Pub/Sub Message. Returns: - bytes: The message data. This is always a bytestring; if you - want a text string, call :meth:`bytes.decode`. + The message data. This is always a bytestring; if you want a text string, + call :meth:`bytes.decode`. """ return self._data @property - def publish_time(self): + def publish_time(self) -> "datetime.datetime": """Return the time that the message was originally published. Returns: - datetime: The date and time that the message was published. + The date and time that the message was published. """ return self._publish_time @property - def ordering_key(self): - """str: the ordering key used to publish the message.""" + def ordering_key(self) -> str: + """The ordering key used to publish the message.""" return self._ordering_key @property - def size(self): + def size(self) -> int: """Return the size of the underlying message, in bytes.""" return self._size @property - def ack_id(self): - """str: the ID used to ack the message.""" + def ack_id(self) -> str: + """the ID used to ack the message.""" return self._ack_id @property - def delivery_attempt(self): + def delivery_attempt(self) -> Optional[int]: """The delivery attempt counter is 1 + (the sum of number of NACKs and number of ack_deadline exceeds) for this message. It is set to None if a DeadLetterPolicy is not set on the subscription. @@ -199,11 +217,11 @@ def delivery_attempt(self): is calculated at best effort and is approximate. Returns: - Optional[int]: The delivery attempt counter or None. + The delivery attempt counter or ``None``. """ return self._delivery_attempt - def ack(self): + def ack(self) -> None: """Acknowledge the given message. Acknowledging a message in Pub/Sub means that you are done @@ -227,7 +245,7 @@ def ack(self): ) ) - def drop(self): + def drop(self) -> None: """Release the message from lease management. This informs the policy to no longer hold on to the lease for this @@ -246,7 +264,7 @@ def drop(self): ) ) - def modify_ack_deadline(self, seconds): + def modify_ack_deadline(self, seconds: int) -> None: """Resets the deadline for acknowledgement. New deadline will be the given value of seconds from now. @@ -257,15 +275,16 @@ def modify_ack_deadline(self, seconds): :class:`~.pubsub_v1.subcriber._consumer.Consumer`. Args: - seconds (int): The number of seconds to set the lease deadline - to. This should be between 0 and 600. Due to network latency, - values below 10 are advised against. + seconds: + The number of seconds to set the lease deadline to. This should be + between 0 and 600. Due to network latency, values below 10 are advised + against. """ self._request_queue.put( requests.ModAckRequest(ack_id=self._ack_id, seconds=seconds) ) - def nack(self): + def nack(self) -> None: """Decline to acknowldge the given message. This will cause the message to be re-delivered to the subscription. diff --git a/google/cloud/pubsub_v1/subscriber/scheduler.py b/google/cloud/pubsub_v1/subscriber/scheduler.py index b8f2b592c..3db7ed73e 100644 --- a/google/cloud/pubsub_v1/subscriber/scheduler.py +++ b/google/cloud/pubsub_v1/subscriber/scheduler.py @@ -21,8 +21,13 @@ import abc import concurrent.futures import queue +import typing +from typing import Callable, List, Optional import warnings +if typing.TYPE_CHECKING: # pragma: NO COVER + from google.cloud import pubsub_v1 + class Scheduler(metaclass=abc.ABCMeta): """Abstract base class for schedulers. @@ -32,7 +37,7 @@ class Scheduler(metaclass=abc.ABCMeta): @property @abc.abstractmethod - def queue(self): # pragma: NO COVER + def queue(self) -> queue.Queue: # pragma: NO COVER """Queue: A concurrency-safe queue specific to the underlying concurrency implementation. @@ -41,13 +46,13 @@ def queue(self): # pragma: NO COVER raise NotImplementedError @abc.abstractmethod - def schedule(self, callback, *args, **kwargs): # pragma: NO COVER + def schedule(self, callback: Callable, *args, **kwargs) -> None: # pragma: NO COVER """Schedule the callback to be called asynchronously. Args: - callback (Callable): The function to call. - args: Positional arguments passed to the function. - kwargs: Key-word arguments passed to the function. + callback: The function to call. + args: Positional arguments passed to the callback. + kwargs: Key-word arguments passed to the callback. Returns: None @@ -55,26 +60,27 @@ def schedule(self, callback, *args, **kwargs): # pragma: NO COVER raise NotImplementedError @abc.abstractmethod - def shutdown(self, await_msg_callbacks=False): # pragma: NO COVER + def shutdown( + self, await_msg_callbacks: bool = False + ) -> List["pubsub_v1.subscriber.message.Message"]: # pragma: NO COVER """Shuts down the scheduler and immediately end all pending callbacks. Args: - await_msg_callbacks (bool): + await_msg_callbacks: If ``True``, the method will block until all currently executing callbacks are done processing. If ``False`` (default), the method will not wait for the currently running callbacks to complete. Returns: - List[pubsub_v1.subscriber.message.Message]: - The messages submitted to the scheduler that were not yet dispatched - to their callbacks. - It is assumed that each message was submitted to the scheduler as the - first positional argument to the provided callback. + The messages submitted to the scheduler that were not yet dispatched + to their callbacks. + It is assumed that each message was submitted to the scheduler as the + first positional argument to the provided callback. """ raise NotImplementedError -def _make_default_thread_pool_executor(): +def _make_default_thread_pool_executor() -> concurrent.futures.ThreadPoolExecutor: return concurrent.futures.ThreadPoolExecutor( max_workers=10, thread_name_prefix="ThreadPoolExecutor-ThreadScheduler" ) @@ -87,11 +93,14 @@ class ThreadScheduler(Scheduler): This scheduler is useful in typical I/O-bound message processing. Args: - executor(concurrent.futures.ThreadPoolExecutor): An optional executor - to use. If not specified, a default one will be created. + executor: + An optional executor to use. If not specified, a default one + will be created. """ - def __init__(self, executor=None): + def __init__( + self, executor: Optional[concurrent.futures.ThreadPoolExecutor] = None + ): self._queue = queue.Queue() if executor is None: self._executor = _make_default_thread_pool_executor() @@ -104,13 +113,13 @@ def queue(self): and the scheduling thread.""" return self._queue - def schedule(self, callback, *args, **kwargs): + def schedule(self, callback: Callable, *args, **kwargs) -> None: """Schedule the callback to be called asynchronously in a thread pool. Args: - callback (Callable): The function to call. - args: Positional arguments passed to the function. - kwargs: Key-word arguments passed to the function. + callback: The function to call. + args: Positional arguments passed to the callback. + kwargs: Key-word arguments passed to the callback. Returns: None @@ -124,21 +133,22 @@ def schedule(self, callback, *args, **kwargs): stacklevel=2, ) - def shutdown(self, await_msg_callbacks=False): + def shutdown( + self, await_msg_callbacks: bool = False + ) -> List["pubsub_v1.subscriber.message.Message"]: """Shut down the scheduler and immediately end all pending callbacks. Args: - await_msg_callbacks (bool): + await_msg_callbacks: If ``True``, the method will block until all currently executing executor threads are done processing. If ``False`` (default), the method will not wait for the currently running threads to complete. Returns: - List[pubsub_v1.subscriber.message.Message]: - The messages submitted to the scheduler that were not yet dispatched - to their callbacks. - It is assumed that each message was submitted to the scheduler as the - first positional argument to the provided callback. + The messages submitted to the scheduler that were not yet dispatched + to their callbacks. + It is assumed that each message was submitted to the scheduler as the + first positional argument to the provided callback. """ dropped_messages = [] diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index f8aa532a0..0558c2f1e 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -18,6 +18,8 @@ import enum import inspect import sys +import typing +from typing import Dict, NamedTuple import proto @@ -37,34 +39,39 @@ from google.pubsub_v1.types import pubsub as pubsub_gapic_types +if typing.TYPE_CHECKING: # pragma: NO COVER + from types import ModuleType + from google.api_core import retry as retries + from google.pubsub_v1 import types as gapic_types + + # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and # these settings can be altered to tweak Pub/Sub behavior. # The defaults should be fine for most use cases. -BatchSettings = collections.namedtuple( - "BatchSettings", ["max_bytes", "max_latency", "max_messages"] -) -BatchSettings.__new__.__defaults__ = ( # pytype: disable=attribute-error - 1 * 1000 * 1000, # max_bytes: 1 MB - 0.01, # max_latency: 10 ms - 100, # max_messages: 100 -) -BatchSettings.__doc__ = "The settings for batch publishing the messages." -BatchSettings.max_bytes.__doc__ = ( - "The maximum total size of the messages to collect before automatically " - "publishing the batch, including any byte size overhead of the publish " - "request itself. The maximum value is bound by the server-side limit of " - "10_000_000 bytes." -) -BatchSettings.max_latency.__doc__ = ( - "The maximum number of seconds to wait for additional messages before " - "automatically publishing the batch." -) -BatchSettings.max_messages.__doc__ = ( - "The maximum number of messages to collect before automatically " - "publishing the batch." -) +class BatchSettings(NamedTuple): + """The settings for batch publishing the messages.""" + + max_bytes: int = 1 * 1000 * 1000 # 1 MB + ( + "The maximum total size of the messages to collect before automatically " + "publishing the batch, including any byte size overhead of the publish " + "request itself. The maximum value is bound by the server-side limit of " + "10_000_000 bytes." + ) + + max_latency: float = 0.01 # 10 ms + ( + "The maximum number of seconds to wait for additional messages before " + "automatically publishing the batch." + ) + + max_messages: int = 100 + ( + "The maximum number of messages to collect before automatically " + "publishing the batch." + ) class LimitExceededBehavior(str, enum.Enum): @@ -75,105 +82,89 @@ class LimitExceededBehavior(str, enum.Enum): ERROR = "error" -PublishFlowControl = collections.namedtuple( - "PublishFlowControl", ["message_limit", "byte_limit", "limit_exceeded_behavior"] -) -# pytype: disable=attribute-error -PublishFlowControl.__new__.__defaults__ = ( - 10 * BatchSettings.__new__.__defaults__[2], # message limit - 10 * BatchSettings.__new__.__defaults__[0], # byte limit - LimitExceededBehavior.IGNORE, # desired behavior -) -# pytype: enable=attribute-error -PublishFlowControl.__doc__ = "The client flow control settings for message publishing." -PublishFlowControl.message_limit.__doc__ = ( - "The maximum number of messages awaiting to be published." -) -PublishFlowControl.byte_limit.__doc__ = ( - "The maximum total size of messages awaiting to be published." -) -PublishFlowControl.limit_exceeded_behavior.__doc__ = ( - "The action to take when publish flow control limits are exceeded." -) +class PublishFlowControl(NamedTuple): + """The client flow control settings for message publishing.""" + + message_limit: int = 10 * BatchSettings.__new__.__defaults__[2] + """The maximum number of messages awaiting to be published.""" + + byte_limit: int = 10 * BatchSettings.__new__.__defaults__[0] + """The maximum total size of messages awaiting to be published.""" + + limit_exceeded_behavior: LimitExceededBehavior = LimitExceededBehavior.IGNORE + """The action to take when publish flow control limits are exceeded.""" + # Define the default publisher options. # # This class is used when creating a publisher client to pass in options # to enable/disable features. -PublisherOptions = collections.namedtuple( - "PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"] -) -PublisherOptions.__new__.__defaults__ = ( # pytype: disable=attribute-error - False, # enable_message_ordering: False - PublishFlowControl(), # default flow control settings - gapic_v1.method.DEFAULT, # use default api_core value for retry - gapic_v1.method.DEFAULT, # use default api_core value for timeout -) -PublisherOptions.__doc__ = "The options for the publisher client." -PublisherOptions.enable_message_ordering.__doc__ = ( - "Whether to order messages in a batch by a supplied ordering key." -) -PublisherOptions.flow_control.__doc__ = ( - "Flow control settings for message publishing by the client. By default " - "the publisher client does not do any throttling." -) -PublisherOptions.retry.__doc__ = ( - "Retry settings for message publishing by the client. This should be " - "an instance of :class:`google.api_core.retry.Retry`." -) -PublisherOptions.timeout.__doc__ = ( - "Timeout settings for message publishing by the client. It should be compatible " - "with :class:`~.pubsub_v1.types.TimeoutType`." -) +class PublisherOptions(NamedTuple): + """The options for the publisher client.""" + + enable_message_ordering: bool = False + """Whether to order messages in a batch by a supplied ordering key.""" + + flow_control: PublishFlowControl = PublishFlowControl() + ( + "Flow control settings for message publishing by the client. By default " + "the publisher client does not do any throttling." + ) + + retry: "retries.Retry" = gapic_v1.method.DEFAULT # use api_core default + ( + "Retry settings for message publishing by the client. This should be " + "an instance of :class:`google.api_core.retry.Retry`." + ) + + timeout: "gapic_types.TimeoutType" = gapic_v1.method.DEFAULT # use api_core default + ( + "Timeout settings for message publishing by the client. It should be " + "compatible with :class:`~.pubsub_v1.types.TimeoutType`." + ) + # Define the type class and default values for flow control settings. # # This class is used when creating a publisher or subscriber client, and # these settings can be altered to tweak Pub/Sub behavior. # The defaults should be fine for most use cases. -FlowControl = collections.namedtuple( - "FlowControl", - [ - "max_bytes", - "max_messages", - "max_lease_duration", - "max_duration_per_lease_extension", - ], -) -FlowControl.__new__.__defaults__ = ( # pytype: disable=attribute-error - 100 * 1024 * 1024, # max_bytes: 100mb - 1000, # max_messages: 1000 - 1 * 60 * 60, # max_lease_duration: 1 hour. - 0, # max_duration_per_lease_extension: disabled -) -FlowControl.__doc__ = ( - "The settings for controlling the rate at which messages are pulled " - "with an asynchronous subscription." -) -FlowControl.max_bytes.__doc__ = ( - "The maximum total size of received - but not yet processed - messages " - "before pausing the message stream." -) -FlowControl.max_messages.__doc__ = ( - "The maximum number of received - but not yet processed - messages before " - "pausing the message stream." -) -FlowControl.max_lease_duration.__doc__ = ( - "The maximum amount of time in seconds to hold a lease on a message " - "before dropping it from the lease management." -) -FlowControl.max_duration_per_lease_extension.__doc__ = ( - "The max amount of time in seconds for a single lease extension attempt. " - "Bounds the delay before a message redelivery if the subscriber " - "fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored " - "if set to 0." -) +class FlowControl(NamedTuple): + """The settings for controlling the rate at which messages are pulled + with an asynchronous subscription. + """ + + max_bytes: int = 100 * 1024 * 1024 # 100 MiB + ( + "The maximum total size of received - but not yet processed - messages " + "before pausing the message stream." + ) + + max_messages: int = 1000 + ( + "The maximum number of received - but not yet processed - messages before " + "pausing the message stream." + ) + + max_lease_duration: float = 1 * 60 * 60 # 1 hour + ( + "The maximum amount of time in seconds to hold a lease on a message " + "before dropping it from the lease management." + ) + + max_duration_per_lease_extension: float = 0 # disabled by default + ( + "The max amount of time in seconds for a single lease extension attempt. " + "Bounds the delay before a message redelivery if the subscriber " + "fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored " + "if set to 0." + ) # The current api core helper does not find new proto messages of type proto.Message, # thus we need our own helper. Adjusted from # https://github.com/googleapis/python-api-core/blob/8595f620e7d8295b6a379d6fd7979af3bef717e2/google/api_core/protobuf_helpers.py#L101-L118 -def _get_protobuf_messages(module): +def _get_protobuf_messages(module: "ModuleType") -> Dict[str, proto.Message]: """Discover all protobuf Message classes in a given import module. Args: