diff --git a/UPGRADING.md b/UPGRADING.md index 3837464fc..9ffdb5507 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -100,7 +100,7 @@ specified by the API producer. *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: google.pubsub_v1.types.TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: ``` @@ -161,3 +161,44 @@ The publisher and subscriber clients cannot be constructed with `client_config` argument anymore. If you want to customize retry and timeout settings for a particular method, you need to do it upon method invocation by passing the custom `timeout` and `retry` arguments, respectively. + + +## Custom Retry and Timeout settings for Publisher Client + +The ``publisher_options`` parameter to the Publisher Client, as well as all of the +client's methods, now accept custom retry and timeout settings: + +```py +custom_retry = api_core.retry.Retry( + initial=0.250, # seconds (default: 0.1) + maximum=90.0, # seconds (default: 60.0) + multiplier=1.45, # default: 1.3 + deadline=300.0, # seconds (default: 60.0) + predicate=api_core.retry.if_exception_type( + api_core.exceptions.Aborted, + api_core.exceptions.DeadlineExceeded, + api_core.exceptions.InternalServerError, + api_core.exceptions.ResourceExhausted, + api_core.exceptions.ServiceUnavailable, + api_core.exceptions.Unknown, + api_core.exceptions.Cancelled, + ), +) + +custom_timeout=api_core.timeout.ExponentialTimeout( + initial=1.0, + maximum=10.0, + multiplier=1.0, + deadline=300.0, +) + +publisher = pubsub_v1.PublisherClient( + publisher_options = pubsub_v1.types.PublisherOptions( + retry=custom_retry, + timeout=custom_timeout, + ), +) +``` + +The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`, +or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example. diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index b3936c215..e59dff00e 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -73,6 +73,9 @@ class Batch(base.Batch): commit_retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried when commiting the batch. If not provided, a default retry is used. + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when commiting the batch. If not provided, a + default timeout is used. """ def __init__( @@ -83,6 +86,7 @@ def __init__( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): self._client = client self._topic = topic @@ -106,6 +110,7 @@ def __init__( self._size = self._base_request_size self._commit_retry = commit_retry + self._commit_timeout = commit_timeout @staticmethod def make_lock(): @@ -261,7 +266,10 @@ def _commit(self): try: # Performs retries for errors defined by the retry configuration. response = self._client.api.publish( - topic=self._topic, messages=self._messages, retry=self._commit_retry + topic=self._topic, + messages=self._messages, + retry=self._commit_retry, + timeout=self._commit_timeout, ) except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/base.py b/google/cloud/pubsub_v1/publisher/_sequencer/base.py index c14b2975d..60a7d269c 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/base.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/base.py @@ -16,6 +16,9 @@ import abc +from google.api_core import gapic_v1 +from google.pubsub_v1 import types as gapic_types + class Sequencer(metaclass=abc.ABCMeta): """The base class for sequencers for Pub/Sub publishing. A sequencer @@ -45,7 +48,12 @@ def unpause(self, message): # pragma: NO COVER @staticmethod @abc.abstractmethod - def publish(self, message, retry=None): # pragma: NO COVER + def publish( + self, + message, + retry=None, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + ): # pragma: NO COVER """ Publish message for this ordering key. Args: @@ -53,6 +61,8 @@ def publish(self, message, retry=None): # pragma: NO COVER The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when publishing the message. Returns: A class instance that conforms to Python Standard library's diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index f7c0be084..83dd0c921 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -21,6 +21,7 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base from google.cloud.pubsub_v1.publisher._batch import base as batch_base +from google.pubsub_v1 import types as gapic_types class _OrderedSequencerStatus(str, enum.Enum): @@ -226,13 +227,19 @@ def unpause(self): raise RuntimeError("Ordering key is not paused.") self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES - def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): + def _create_batch( + self, + commit_retry=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + ): """ Create a new batch using the client's batch class and other stored settings. Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when publishing the batch. """ return self._client._batch_class( client=self._client, @@ -241,9 +248,15 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): batch_done_callback=self._batch_done_callback, commit_when_full=False, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) - def publish(self, message, retry=gapic_v1.method.DEFAULT): + def publish( + self, + message, + retry=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + ): """ Publish message for this ordering key. Args: @@ -251,6 +264,8 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when publishing the message. Returns: A class instance that conforms to Python Standard library's @@ -287,13 +302,15 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): ), "Publish is only allowed in accepting-messages state." if not self._ordered_batches: - new_batch = self._create_batch(commit_retry=retry) + new_batch = self._create_batch( + commit_retry=retry, commit_timeout=timeout + ) self._ordered_batches.append(new_batch) batch = self._ordered_batches[-1] future = batch.publish(message) while future is None: - batch = self._create_batch(commit_retry=retry) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._ordered_batches.append(batch) future = batch.publish(message) diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index d343ed945..76dd1cad7 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -15,6 +15,7 @@ from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base +from google.pubsub_v1 import types as gapic_types class UnorderedSequencer(base.Sequencer): @@ -77,13 +78,19 @@ def unpause(self): """ Not relevant for this class. """ raise NotImplementedError - def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): + def _create_batch( + self, + commit_retry=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + ): """ Create a new batch using the client's batch class and other stored settings. Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when publishing the batch. """ return self._client._batch_class( client=self._client, @@ -92,9 +99,15 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): batch_done_callback=None, commit_when_full=True, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) - def publish(self, message, retry=gapic_v1.method.DEFAULT): + def publish( + self, + message, + retry=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + ): """ Batch message into existing or new batch. Args: @@ -102,6 +115,8 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply when publishing the message. Returns: ~google.api_core.future.Future: An object conforming to @@ -119,7 +134,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): raise RuntimeError("Unordered sequencer already stopped.") if not self._current_batch: - newbatch = self._create_batch(commit_retry=retry) + newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._current_batch = newbatch batch = self._current_batch @@ -129,7 +144,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): future = batch.publish(message) # batch is full, triggering commit_when_full if future is None: - batch = self._create_batch(commit_retry=retry) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) # At this point, we lose track of the old batch, but we don't # care since it's already committed (because it was full.) self._current_batch = batch diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 4703cc3c4..e35832653 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -230,7 +230,13 @@ def resume_publish(self, topic, ordering_key): sequencer.unpause() def publish( - self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs + self, + topic, + data, + ordering_key="", + retry=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, + **attrs ): """Publish a single message. @@ -269,6 +275,12 @@ def publish( retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried. If `ordering_key` is specified, the total retry deadline will be changed to "infinity". + If given, it overides any retry passed into the client through + the ``publisher_options`` argument. + timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout for the RPC request. Can be used to override any timeout + passed in through ``publisher_options`` when instantiating the client. + attrs (Mapping[str, str]): A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) @@ -331,6 +343,12 @@ def publish( def on_publish_done(future): self._flow_controller.release(message) + if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in + retry = self.publisher_options.retry + + if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in + timeout = self.publisher_options.timeout + with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") @@ -347,7 +365,7 @@ def on_publish_done(future): # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry) + future = sequencer.publish(message, retry=retry, timeout=timeout) future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index d72541a3b..5fc7dd581 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -22,6 +22,7 @@ import proto from google.api import http_pb2 +from google.api_core import gapic_v1 from google.iam.v1 import iam_policy_pb2 from google.iam.v1 import policy_pb2 from google.iam.v1.logging import audit_data_pb2 @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum): # This class is used when creating a publisher client to pass in options # to enable/disable features. PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering", "flow_control"] + "PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"] ) PublisherOptions.__new__.__defaults__ = ( False, # enable_message_ordering: False PublishFlowControl(), # default flow control settings + gapic_v1.method.DEFAULT, # use default api_core value for retry + gapic_v1.method.DEFAULT, # use default api_core value for timeout ) PublisherOptions.__doc__ = "The options for the publisher client." PublisherOptions.enable_message_ordering.__doc__ = ( @@ -112,6 +115,14 @@ class LimitExceededBehavior(str, enum.Enum): "Flow control settings for message publishing by the client. By default " "the publisher client does not do any throttling." ) +PublisherOptions.retry.__doc__ = ( + "Retry settings for message publishing by the client. This should be " + "an instance of :class:`google.api_core.retry.Retry`." +) +PublisherOptions.timeout.__doc__ = ( + "Timeout settings for message publishing by the client. It should be compatible " + "with :class:`~.pubsub_v1.types.TimeoutType`." +) # Define the type class and default values for flow control settings. # diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index d644364b2..041391c57 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -25,6 +25,7 @@ from google.api_core import exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import timeout as timeouts # type: ignore from google.auth import credentials # type: ignore from google.oauth2 import service_account # type: ignore @@ -32,7 +33,7 @@ from google.iam.v1 import policy_pb2 as policy # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub - +from google.pubsub_v1.types import TimeoutType from .transports.base import PublisherTransport, DEFAULT_CLIENT_INFO from .transports.grpc_asyncio import PublisherGrpcAsyncIOTransport from .client import PublisherClient @@ -177,7 +178,7 @@ async def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the [resource @@ -203,7 +204,8 @@ async def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -261,7 +263,7 @@ async def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -270,10 +272,10 @@ async def update_topic( Args: request (:class:`google.pubsub_v1.types.UpdateTopicRequest`): The request object. Request for the UpdateTopic method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -321,7 +323,7 @@ async def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -343,10 +345,10 @@ async def publish( This corresponds to the ``messages`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -416,7 +418,7 @@ async def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -431,10 +433,10 @@ async def get_topic( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -497,7 +499,7 @@ async def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsAsyncPager: r"""Lists matching topics. @@ -512,10 +514,10 @@ async def list_topics( This corresponds to the ``project`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -588,7 +590,7 @@ async def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsAsyncPager: r"""Lists the names of the attached subscriptions on this @@ -606,10 +608,10 @@ async def list_topic_subscriptions( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -682,7 +684,7 @@ async def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsAsyncPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -704,10 +706,10 @@ async def list_topic_snapshots( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -780,7 +782,7 @@ async def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -801,10 +803,10 @@ async def delete_topic( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. """ @@ -857,7 +859,7 @@ async def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -870,10 +872,10 @@ async def detach_subscription( request (:class:`google.pubsub_v1.types.DetachSubscriptionRequest`): The request object. Request for the DetachSubscription method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -921,7 +923,7 @@ async def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -932,7 +934,8 @@ async def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1027,7 +1030,7 @@ async def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1039,7 +1042,8 @@ async def get_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1134,7 +1138,7 @@ async def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control @@ -1146,7 +1150,8 @@ async def test_iam_permissions( `TestIamPermissions` method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index f74e85a0f..3f249b01b 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -27,6 +27,7 @@ from google.api_core import exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import timeout as timeouts # type: ignore from google.auth import credentials # type: ignore from google.auth.transport import mtls # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore @@ -37,6 +38,7 @@ from google.iam.v1 import policy_pb2 as policy # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub +from google.pubsub_v1.types import TimeoutType import grpc @@ -401,7 +403,7 @@ def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the [resource @@ -428,7 +430,8 @@ def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -480,7 +483,7 @@ def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -493,7 +496,8 @@ def update_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -535,7 +539,7 @@ def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -561,7 +565,8 @@ def publish( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -616,7 +621,7 @@ def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -635,7 +640,8 @@ def get_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -688,7 +694,7 @@ def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: r"""Lists matching topics. @@ -707,7 +713,8 @@ def list_topics( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -770,7 +777,7 @@ def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsPager: r"""Lists the names of the attached subscriptions on this @@ -789,10 +796,10 @@ def list_topic_subscriptions( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -855,7 +862,7 @@ def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -881,7 +888,8 @@ def list_topic_snapshots( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -944,7 +952,7 @@ def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -969,7 +977,8 @@ def delete_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. """ @@ -1016,7 +1025,7 @@ def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -1033,7 +1042,8 @@ def detach_subscription( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1075,7 +1085,7 @@ def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -1087,7 +1097,8 @@ def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1185,7 +1196,7 @@ def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1196,9 +1207,8 @@ def get_iam_policy( request (:class:`~.iam_policy.GetIamPolicyRequest`): The request object. Request message for `GetIamPolicy` method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, - should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1296,7 +1306,7 @@ def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control @@ -1307,9 +1317,8 @@ def test_iam_permissions( request (:class:`~.iam_policy.TestIamPermissionsRequest`): The request object. Request message for `TestIamPermissions` method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, - should be retried. - timeout (float): The timeout for this request. + timeout (TimeoutType): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: diff --git a/google/pubsub_v1/types/__init__.py b/google/pubsub_v1/types/__init__.py index 2894f6668..ebc8b5399 100644 --- a/google/pubsub_v1/types/__init__.py +++ b/google/pubsub_v1/types/__init__.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import Union from .pubsub import ( AcknowledgeRequest, @@ -76,7 +76,16 @@ SchemaView, ) +TimeoutType = Union[ + int, + float, + "google.api_core.timeout.ConstantTimeout", + "google.api_core.timeout.ExponentialTimeout", +] +"""The type of the timeout parameter of publisher client methods.""" + __all__ = ( + "TimeoutType", "AcknowledgeRequest", "CreateSnapshotRequest", "DeadLetterPolicy", diff --git a/synth.py b/synth.py index 2ad5d20de..41b63e89e 100644 --- a/synth.py +++ b/synth.py @@ -248,6 +248,52 @@ "\n\g<0>", ) +# Allow timeout to be an instance of google.api_core.timeout.* +s.replace( + "google/pubsub_v1/types/__init__.py", + r"from \.pubsub import \(", + "from typing import Union\n\n\g<0>" +) +s.replace( + "google/pubsub_v1/types/__init__.py", + r"__all__ = \(\n", + textwrap.dedent('''\ + TimeoutType = Union[ + int, + float, + "google.api_core.timeout.ConstantTimeout", + "google.api_core.timeout.ExponentialTimeout", + ] + """The type of the timeout parameter of publisher client methods.""" + + \g<0> "TimeoutType",''') +) + +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"from google.api_core import retry as retries.*\n", + "\g<0>from google.api_core import timeout as timeouts # type: ignore\n" +) +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"from google\.pubsub_v1\.types import pubsub", + "\g<0>\nfrom google.pubsub_v1.types import TimeoutType", +) + +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"(\s+)timeout: float = None.*\n", + "\g<1>timeout: TimeoutType = gapic_v1.method.DEFAULT,", +) +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"([^\S\r\n]+)timeout \(float\): (.*)\n", + ( + "\g<1>timeout (TimeoutType):\n" + "\g<1> \g<2>\n" + ), +) + # The namespace package declaration in google/cloud/__init__.py should be excluded # from coverage. s.replace( diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 1f1850ad2..abf5ec76f 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -42,6 +42,7 @@ def create_batch( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, **batch_settings ): """Return a batch object suitable for testing. @@ -54,6 +55,8 @@ def create_batch( has reached byte-size or number-of-messages limits. commit_retry (Optional[google.api_core.retry.Retry]): The retry settings for the batch commit call. + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): + The timeout to apply to the batch commit call. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. @@ -69,6 +72,7 @@ def create_batch( batch_done_callback=batch_done_callback, commit_when_full=commit_when_full, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) @@ -138,6 +142,7 @@ def test_blocking__commit(): gapic_types.PubsubMessage(data=b"This is another message."), ], retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, ) # Establish that all of the futures are done, and that they have the @@ -166,6 +171,29 @@ def test_blocking__commit_custom_retry(): topic="topic_name", messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=mock.sentinel.custom_retry, + timeout=gapic_v1.method.DEFAULT, + ) + + +def test_blocking__commit_custom_timeout(): + batch = create_batch(commit_timeout=mock.sentinel.custom_timeout) + batch.publish({"data": b"This is my message."}) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a"]) + patch = mock.patch.object( + type(batch.client.api), "publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[gapic_types.PubsubMessage(data=b"This is my message.")], + retry=gapic_v1.method.DEFAULT, + timeout=mock.sentinel.custom_timeout, ) @@ -173,7 +201,7 @@ def test_client_api_publish_not_blocking_additional_publish_calls(): batch = create_batch(max_messages=1) api_publish_called = threading.Event() - def api_publish_delay(topic="", messages=(), retry=None): + def api_publish_delay(topic="", messages=(), retry=None, timeout=None): api_publish_called.set() time.sleep(1.0) message_ids = [str(i) for i in range(len(messages))] diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py index de5dd0523..09795d37b 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py @@ -184,6 +184,18 @@ def test_publish_custom_retry(): assert batch._commit_retry is mock.sentinel.custom_retry +def test_publish_custom_timeout(): + client = create_client() + message = create_message() + sequencer = create_ordered_sequencer(client) + + sequencer.publish(message, timeout=mock.sentinel.custom_timeout) + + assert sequencer._ordered_batches # batch exists + batch = sequencer._ordered_batches[0] + assert batch._commit_timeout is mock.sentinel.custom_timeout + + def test_publish_batch_full(): client = create_client() message = create_message() diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py index 04a89e19b..486cba5f7 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py @@ -101,6 +101,17 @@ def test_publish_custom_retry(): assert sequencer._current_batch._commit_retry is mock.sentinel.custom_retry +def test_publish_custom_timeout(): + client = create_client() + message = create_message() + sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name") + + sequencer.publish(message, timeout=mock.sentinel.custom_timeout) + + assert sequencer._current_batch is not None + assert sequencer._current_batch._commit_timeout is mock.sentinel.custom_timeout + + def test_publish_batch_full(): client = create_client() message = create_message() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 3db5d60cd..161f9e33b 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -337,6 +337,7 @@ def test_publish_new_batch_needed(creds): batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, ) message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) batch1.publish.assert_called_once_with(message_pb) @@ -350,6 +351,44 @@ def test_publish_attrs_type_error(creds): client.publish(topic, b"foo", answer=42) +def test_publish_custom_retry_overrides_configured_retry(creds): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions(retry=mock.sentinel.publish_retry), + ) + + topic = "topic/path" + client._flow_controller = mock.Mock() + fake_sequencer = mock.Mock() + client._get_or_create_sequencer = mock.Mock(return_value=fake_sequencer) + client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) + + fake_sequencer.publish.assert_called_once_with( + mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + ) + message = fake_sequencer.publish.call_args.args[0] + assert message.data == b"hello!" + + +def test_publish_custom_timeout_overrides_configured_timeout(creds): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions(timeout=mock.sentinel.publish_timeout), + ) + + topic = "topic/path" + client._flow_controller = mock.Mock() + fake_sequencer = mock.Mock() + client._get_or_create_sequencer = mock.Mock(return_value=fake_sequencer) + client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) + + fake_sequencer.publish.assert_called_once_with( + mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + ) + message = fake_sequencer.publish.call_args.args[0] + assert message.data == b"hello!" + + def test_stop(creds): client = publisher.Client(credentials=creds)