From 729a5e7e856a30cc81b978248f3110509a01206c Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Mon, 16 Nov 2020 01:02:11 -0600 Subject: [PATCH 01/14] feat: allow retry and timeout settings on publisher client --- google/cloud/pubsub_v1/publisher/client.py | 8 ++- google/cloud/pubsub_v1/types.py | 14 +++++- google/pubsub_v1/services/publisher/client.py | 49 ++++++++++++++----- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 6a9418e69..365844cd1 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -272,7 +272,13 @@ def resume_publish(self, topic, ordering_key): sequencer.unpause() def publish( - self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs + self, + topic, + data, + ordering_key="", + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + **attrs ): """Publish a single message. diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index b875f3cd2..573897cc5 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -22,6 +22,7 @@ import proto from google.api import http_pb2 +from google.api_core import gapic_v1 from google.iam.v1 import iam_policy_pb2 from google.iam.v1 import policy_pb2 from google.iam.v1.logging import audit_data_pb2 @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum): # This class is used when creating a publisher client to pass in options # to enable/disable features. PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering", "flow_control"] + "PublisherConfig", ["enable_message_ordering", "flow_control", "retry", "timeout"] ) PublisherOptions.__new__.__defaults__ = ( False, # enable_message_ordering: False PublishFlowControl(), # default flow control settings + gapic_v1.method.DEFAULT, # use default api_core value for retry + gapic_v1.method.DEFAULT, # use default api_core value for timeout ) PublisherOptions.__doc__ = "The options for the publisher client." PublisherOptions.enable_message_ordering.__doc__ = ( @@ -115,6 +118,15 @@ class LimitExceededBehavior(str, enum.Enum): "Flow control settings for message publishing by the client. By default " "the publisher client does not do any throttling." ) +PublisherOptions.retry.__doc__ = ( + "Retry settings for message publishing by the client. This should be " + "an instance of api.core.retry.Retry." +) +PublisherOptions.timeout.__doc__ = ( + "Timeout settings for message publishing by the client. This should be " + "an instance of api.core.timeout.ConstantTimeout or an instance of " + "api.core.timeout.ExponentialTimeout." +) # Define the type class and default values for flow control settings. # diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 188b3dccb..f618103f4 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -26,6 +26,7 @@ from google.api_core import exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import timeout as timeouts # type: ignore from google.auth import credentials # type: ignore from google.auth.transport import mtls # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore @@ -363,7 +364,9 @@ def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the `resource @@ -441,7 +444,9 @@ def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -496,7 +501,9 @@ def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -577,7 +584,9 @@ def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -648,7 +657,9 @@ def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: r"""Lists matching topics. @@ -729,7 +740,9 @@ def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsPager: r"""Lists the names of the attached subscriptions on this @@ -813,7 +826,9 @@ def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -901,7 +916,9 @@ def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -972,7 +989,9 @@ def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -1031,7 +1050,9 @@ def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -1141,7 +1162,9 @@ def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1252,7 +1275,9 @@ def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control From a9b5190f340404232050fa1ac8416e8279dc7c7d Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Mon, 16 Nov 2020 03:23:16 -0600 Subject: [PATCH 02/14] build: update generated code and update doc --- UPGRADING.md | 46 +++++++++++++++++++++++++++++++++++++++++++++- synth.py | 15 +++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index 3837464fc..2238a0c8d 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -100,7 +100,10 @@ specified by the API producer. *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + google.api_core.timeout.ConstantTimeout, + google.api_core.timeout.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: ``` @@ -161,3 +164,44 @@ The publisher and subscriber clients cannot be constructed with `client_config` argument anymore. If you want to customize retry and timeout settings for a particular method, you need to do it upon method invocation by passing the custom `timeout` and `retry` arguments, respectively. + + +## Custom Retry and Timeout settings for Publisher Client + +The ``publisher_options`` parameter to the Publisher Client, as well as all of the +client's methods, now accept custom retry and timeout settings: + +```py +custom_retry = api_core.retry.Retry( + initial=0.250, # seconds (default: 0.1) + maximum=90.0, # seconds (default: 60.0) + multiplier=1.45, # default: 1.3 + deadline=300.0, # seconds (default: 60.0) + predicate=api_core.retry.if_exception_type( + api_core.exceptions.Aborted, + api_core.exceptions.DeadlineExceeded, + api_core.exceptions.InternalServerError, + api_core.exceptions.ResourceExhausted, + api_core.exceptions.ServiceUnavailable, + api_core.exceptions.Unknown, + api_core.exceptions.Cancelled, + ), +) + +custom_timeout=api_core.timeout.ExponentialTimeout( + initial=1.0, + maximum=10.0, + multiplier=1.0, + deadline=300.0, +) + +publisher = pubsub_v1.PublisherClient( + publisher_options = pubsub_v1.types.PublisherOptions( + retry=custom_retry, + timeout=custom_timeout, + ), +) +``` + +The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`, +or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example. diff --git a/synth.py b/synth.py index fe1b0838e..cf6b8ed72 100644 --- a/synth.py +++ b/synth.py @@ -104,6 +104,21 @@ "\n\g<0>", ) +# Allow timeout to be an instance of google.api_core.timeout.* +s.replace( + "google/pubsub_v1/services/publisher/client.py", + r"from google.api_core import retry as retries.*\n", + "\g<0>from google.api_core import timeout as timeouts # type: ignore\n" +) +s.replace( + "google/pubsub_v1/services/publisher/client.py", + r"(\s+)timeout: float = None.*\n", + """ + \g<1>timeout: Union[ + \g<1> timeouts.ConstantTimeout, timeouts.ExponentialTimeout + \g<1>] = gapic_v1.method.DEFAULT,""", +) + # ---------------------------------------------------------------------------- # Add templated files # ---------------------------------------------------------------------------- From 12f2140df54a58202a4d8ddbabae4424b5b576ef Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 26 Feb 2021 12:13:06 +0100 Subject: [PATCH 03/14] Propagate publish timeout to the RPC call --- .../pubsub_v1/publisher/_batch/thread.py | 13 +++++++- .../pubsub_v1/publisher/_sequencer/base.py | 7 +++- .../publisher/_sequencer/ordered_sequencer.py | 27 ++++++++++++--- .../_sequencer/unordered_sequencer.py | 25 +++++++++++--- google/cloud/pubsub_v1/publisher/client.py | 18 +++++++++- .../pubsub_v1/publisher/batch/test_thread.py | 33 ++++++++++++++++++- .../sequencer/test_ordered_sequencer.py | 12 +++++++ .../sequencer/test_unordered_sequencer.py | 11 +++++++ .../publisher/test_publisher_client.py | 1 + 9 files changed, 135 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index fc4e6ba6d..eb84cc0d9 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -75,6 +75,12 @@ class Batch(base.Batch): commit_retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried when commiting the batch. If not provided, a default retry is used. + commit_timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when commiting the batch. If not provided, a + default timeout is used. """ def __init__( @@ -85,6 +91,7 @@ def __init__( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, ): self._client = client self._topic = topic @@ -108,6 +115,7 @@ def __init__( self._size = self._base_request_size self._commit_retry = commit_retry + self._commit_timeout = commit_timeout @staticmethod def make_lock(): @@ -261,7 +269,10 @@ def _commit(self): try: # Performs retries for errors defined by the retry configuration. response = self._client.api.publish( - topic=self._topic, messages=self._messages, retry=self._commit_retry + topic=self._topic, + messages=self._messages, + retry=self._commit_retry, + timeout=self._commit_timeout, ) except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/base.py b/google/cloud/pubsub_v1/publisher/_sequencer/base.py index 3cfa809f7..e57efff35 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/base.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/base.py @@ -48,7 +48,7 @@ def unpause(self, message): @staticmethod @abc.abstractmethod - def publish(self, message, retry=None): + def publish(self, message, retry=None, timeout=None): """ Publish message for this ordering key. Args: @@ -56,6 +56,11 @@ def publish(self, message, retry=None): The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when publishing the message. Returns: A class instance that conforms to Python Standard library's diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index f7c0be084..1848d676d 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -226,13 +226,22 @@ def unpause(self): raise RuntimeError("Ordering key is not paused.") self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES - def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): + def _create_batch( + self, + commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, + ): """ Create a new batch using the client's batch class and other stored settings. Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. + commit_timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when publishing the batch. """ return self._client._batch_class( client=self._client, @@ -241,9 +250,12 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): batch_done_callback=self._batch_done_callback, commit_when_full=False, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) - def publish(self, message, retry=gapic_v1.method.DEFAULT): + def publish( + self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + ): """ Publish message for this ordering key. Args: @@ -251,6 +263,11 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when publishing the message. Returns: A class instance that conforms to Python Standard library's @@ -287,13 +304,15 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): ), "Publish is only allowed in accepting-messages state." if not self._ordered_batches: - new_batch = self._create_batch(commit_retry=retry) + new_batch = self._create_batch( + commit_retry=retry, commit_timeout=timeout + ) self._ordered_batches.append(new_batch) batch = self._ordered_batches[-1] future = batch.publish(message) while future is None: - batch = self._create_batch(commit_retry=retry) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._ordered_batches.append(batch) future = batch.publish(message) diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index d343ed945..627f1177d 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -77,13 +77,22 @@ def unpause(self): """ Not relevant for this class. """ raise NotImplementedError - def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): + def _create_batch( + self, + commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, + ): """ Create a new batch using the client's batch class and other stored settings. Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. + commit_timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when publishing the batch. """ return self._client._batch_class( client=self._client, @@ -92,9 +101,12 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT): batch_done_callback=None, commit_when_full=True, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) - def publish(self, message, retry=gapic_v1.method.DEFAULT): + def publish( + self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + ): """ Batch message into existing or new batch. Args: @@ -102,6 +114,11 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. + timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout to apply when publishing the message. Returns: ~google.api_core.future.Future: An object conforming to @@ -119,7 +136,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): raise RuntimeError("Unordered sequencer already stopped.") if not self._current_batch: - newbatch = self._create_batch(commit_retry=retry) + newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._current_batch = newbatch batch = self._current_batch @@ -129,7 +146,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT): future = batch.publish(message) # batch is full, triggering commit_when_full if future is None: - batch = self._create_batch(commit_retry=retry) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) # At this point, we lose track of the old batch, but we don't # care since it's already committed (because it was full.) self._current_batch = batch diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 05c82d7ac..29ce2e72e 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -282,6 +282,16 @@ def publish( retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried. If `ordering_key` is specified, the total retry deadline will be changed to "infinity". + If given, it overides any retry passed into the client through + the ``publisher_options`` argument. + + timeout (Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]): + The timeout for the RPC request. Can be used to override any timeout + passed in through ``publisher_options`` when instantiating the client. + attrs (Mapping[str, str]): A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) @@ -344,6 +354,12 @@ def publish( def on_publish_done(future): self._flow_controller.release(message) + if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in + retry = self.publisher_options.retry + + if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in + timeout = self.publisher_options.timeout + with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") @@ -360,7 +376,7 @@ def on_publish_done(future): # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry) + future = sequencer.publish(message, retry=retry, timeout=timeout) future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index cd634f8f8..81ddc09ca 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -42,6 +42,7 @@ def create_batch( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, **batch_settings ): """Return a batch object suitable for testing. @@ -54,6 +55,11 @@ def create_batch( has reached byte-size or number-of-messages limits. commit_retry (Optional[google.api_core.retry.Retry]): The retry settings for the batch commit call. + commit_timeout (Optional[Union[ \ + googole.api_core.timeout.ConstantTimeout, \ + googole.api_core.timeout.ExponentialTimeout \ + ]]): + The timeout to apply to the batch commit call. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. @@ -69,6 +75,7 @@ def create_batch( batch_done_callback=batch_done_callback, commit_when_full=commit_when_full, commit_retry=commit_retry, + commit_timeout=commit_timeout, ) @@ -138,6 +145,7 @@ def test_blocking__commit(): gapic_types.PubsubMessage(data=b"This is another message."), ], retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, ) # Establish that all of the futures are done, and that they have the @@ -166,6 +174,29 @@ def test_blocking__commit_custom_retry(): topic="topic_name", messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=mock.sentinel.custom_retry, + timeout=gapic_v1.method.DEFAULT, + ) + + +def test_blocking__commit_custom_timeout(): + batch = create_batch(commit_timeout=mock.sentinel.custom_timeout) + batch.publish({"data": b"This is my message."}) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a"]) + patch = mock.patch.object( + type(batch.client.api), "publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[gapic_types.PubsubMessage(data=b"This is my message.")], + retry=gapic_v1.method.DEFAULT, + timeout=mock.sentinel.custom_timeout, ) @@ -173,7 +204,7 @@ def test_client_api_publish_not_blocking_additional_publish_calls(): batch = create_batch(max_messages=1) api_publish_called = threading.Event() - def api_publish_delay(topic="", messages=(), retry=None): + def api_publish_delay(topic="", messages=(), retry=None, timeout=None): api_publish_called.set() time.sleep(1.0) message_ids = [str(i) for i in range(len(messages))] diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py index de5dd0523..09795d37b 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py @@ -184,6 +184,18 @@ def test_publish_custom_retry(): assert batch._commit_retry is mock.sentinel.custom_retry +def test_publish_custom_timeout(): + client = create_client() + message = create_message() + sequencer = create_ordered_sequencer(client) + + sequencer.publish(message, timeout=mock.sentinel.custom_timeout) + + assert sequencer._ordered_batches # batch exists + batch = sequencer._ordered_batches[0] + assert batch._commit_timeout is mock.sentinel.custom_timeout + + def test_publish_batch_full(): client = create_client() message = create_message() diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py index b8aff0d2c..3140f71f6 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py @@ -100,6 +100,17 @@ def test_publish_custom_retry(): assert sequencer._current_batch._commit_retry is mock.sentinel.custom_retry +def test_publish_custom_timeout(): + client = create_client() + message = create_message() + sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name") + + sequencer.publish(message, timeout=mock.sentinel.custom_timeout) + + assert sequencer._current_batch is not None + assert sequencer._current_batch._commit_timeout is mock.sentinel.custom_timeout + + def test_publish_batch_full(): client = create_client() message = create_message() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 71b432aa7..2be4c113c 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -359,6 +359,7 @@ def test_publish_new_batch_needed(): batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, + commit_timeout=gapic_v1.method.DEFAULT, ) message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) batch1.publish.assert_called_once_with(message_pb) From 240f992f0ef2dc76a20661e05246e55dfae9f87a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 26 Feb 2021 23:20:20 +0100 Subject: [PATCH 04/14] test publisher client --- .../publisher/test_publisher_client.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 2be4c113c..849fb65d7 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -374,6 +374,46 @@ def test_publish_attrs_type_error(): client.publish(topic, b"foo", answer=42) +def test_publish_custom_retry_overrides_configured_retry(): + creds = mock.Mock(spec=credentials.Credentials) + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions(retry=mock.sentinel.publish_retry), + ) + + topic = "topic/path" + client._flow_controller = mock.Mock() + fake_sequencer = mock.Mock() + client._get_or_create_sequencer = mock.Mock(return_value=fake_sequencer) + client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) + + fake_sequencer.publish.assert_called_once_with( + mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + ) + message = fake_sequencer.publish.call_args.args[0] + assert message.data == b"hello!" + + +def test_publish_custom_timeout_overrides_configured_timeout(): + creds = mock.Mock(spec=credentials.Credentials) + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions(timeout=mock.sentinel.publish_timeout), + ) + + topic = "topic/path" + client._flow_controller = mock.Mock() + fake_sequencer = mock.Mock() + client._get_or_create_sequencer = mock.Mock(return_value=fake_sequencer) + client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) + + fake_sequencer.publish.assert_called_once_with( + mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + ) + message = fake_sequencer.publish.call_args.args[0] + assert message.data == b"hello!" + + def test_stop(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds) From 272ccdc252dfe07fd5c00f6041533c7c49d52dcf Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 26 Feb 2021 13:16:59 +0100 Subject: [PATCH 05/14] Fix timeout parameter type in docstrings --- google/pubsub_v1/services/publisher/client.py | 36 ++++++++++++------- synth.py | 8 +++++ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index ec2b4e057..5c0dba13a 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -405,7 +405,8 @@ def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -472,7 +473,8 @@ def update_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -541,7 +543,8 @@ def publish( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -617,7 +620,8 @@ def get_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -690,7 +694,8 @@ def list_topics( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -776,7 +781,8 @@ def list_topic_subscriptions( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -866,7 +872,8 @@ def list_topic_snapshots( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -955,7 +962,8 @@ def delete_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. """ @@ -1021,7 +1029,8 @@ def detach_subscription( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1077,7 +1086,8 @@ def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1190,7 +1200,8 @@ def get_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1303,7 +1314,8 @@ def test_iam_permissions( `TestIamPermissions` method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: diff --git a/synth.py b/synth.py index c76f12b55..0fa438de5 100644 --- a/synth.py +++ b/synth.py @@ -147,6 +147,14 @@ \g<1> timeouts.ConstantTimeout, timeouts.ExponentialTimeout \g<1>] = gapic_v1.method.DEFAULT,""", ) +s.replace( + "google/pubsub_v1/services/publisher/client.py", + r"([^\S\r\n]+)timeout \(float\): (.*)\n", + ( + "\g<1>timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):\n" + "\g<1> \g<2>\n" + ), +) # ---------------------------------------------------------------------------- # Add templated files From a0a1fcb2685526d075c0ed6e551f2677674cfa77 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 26 Feb 2021 13:46:15 +0100 Subject: [PATCH 06/14] Apply timeout changes to async publisher client --- .../services/publisher/async_client.py | 85 +++++++++++++------ synth.py | 6 +- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index 0597b6e88..02cf356fa 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -25,6 +25,7 @@ from google.api_core import exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import timeout as timeouts # type: ignore from google.auth import credentials # type: ignore from google.oauth2 import service_account # type: ignore @@ -148,7 +149,9 @@ async def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the [resource @@ -173,7 +176,8 @@ async def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -230,7 +234,9 @@ async def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -242,7 +248,8 @@ async def update_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -289,7 +296,9 @@ async def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -313,7 +322,8 @@ async def publish( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -382,7 +392,9 @@ async def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -399,7 +411,8 @@ async def get_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -461,7 +474,9 @@ async def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsAsyncPager: r"""Lists matching topics. @@ -478,7 +493,8 @@ async def list_topics( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -550,7 +566,9 @@ async def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsAsyncPager: r"""Lists the names of the attached subscriptions on this @@ -570,7 +588,8 @@ async def list_topic_subscriptions( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -642,7 +661,9 @@ async def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsAsyncPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -666,7 +687,8 @@ async def list_topic_snapshots( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -738,7 +760,9 @@ async def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -761,7 +785,8 @@ async def delete_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. """ @@ -813,7 +838,9 @@ async def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -829,7 +856,8 @@ async def detach_subscription( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -876,7 +904,9 @@ async def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -887,7 +917,8 @@ async def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -982,7 +1013,9 @@ async def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -994,7 +1027,8 @@ async def get_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: @@ -1089,7 +1123,9 @@ async def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control @@ -1101,7 +1137,8 @@ async def test_iam_permissions( `TestIamPermissions` method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (float): The timeout for this request. + timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. Returns: diff --git a/synth.py b/synth.py index 0fa438de5..05fe4273d 100644 --- a/synth.py +++ b/synth.py @@ -135,12 +135,12 @@ # Allow timeout to be an instance of google.api_core.timeout.* s.replace( - "google/pubsub_v1/services/publisher/client.py", + "google/pubsub_v1/services/publisher/*client.py", r"from google.api_core import retry as retries.*\n", "\g<0>from google.api_core import timeout as timeouts # type: ignore\n" ) s.replace( - "google/pubsub_v1/services/publisher/client.py", + "google/pubsub_v1/services/publisher/*client.py", r"(\s+)timeout: float = None.*\n", """ \g<1>timeout: Union[ @@ -148,7 +148,7 @@ \g<1>] = gapic_v1.method.DEFAULT,""", ) s.replace( - "google/pubsub_v1/services/publisher/client.py", + "google/pubsub_v1/services/publisher/*client.py", r"([^\S\r\n]+)timeout \(float\): (.*)\n", ( "\g<1>timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):\n" From ce8daea0b7afa36be0d39df3bc70473aaf115144 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 27 May 2021 12:14:32 +0200 Subject: [PATCH 07/14] Introduce TimeoutType type alias --- google/cloud/pubsub_v1/types.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 9e6f82536..355ee2d12 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -18,10 +18,12 @@ import enum import inspect import sys +from typing import Union import proto from google.api import http_pb2 +import google.api_core from google.api_core import gapic_v1 from google.iam.v1 import iam_policy_pb2 from google.iam.v1 import policy_pb2 @@ -37,6 +39,16 @@ from google.pubsub_v1.types import pubsub as pubsub_gapic_types +TimeoutType = Union[ + None, + int, + float, + google.api_core.timeout.ConstantTimeout, + google.api_core.timeout.ExponentialTimeout, +] +"""The type of the timeout parameter of publisher client methods.""" + + # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and @@ -206,6 +218,7 @@ def _get_protobuf_messages(module): _local_modules = [pubsub_gapic_types] names = [ + "TimeoutType", "BatchSettings", "LimitExceededBehavior", "PublishFlowControl", From 2adaba1b87234fbbbb5a1f063c2122d912ee66a3 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 27 May 2021 12:38:48 +0200 Subject: [PATCH 08/14] Update PublisherOptions docs --- google/cloud/pubsub_v1/types.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 355ee2d12..03da716e0 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -111,7 +111,7 @@ class LimitExceededBehavior(str, enum.Enum): # This class is used when creating a publisher client to pass in options # to enable/disable features. PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering", "flow_control", "retry", "timeout"] + "PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"] ) PublisherOptions.__new__.__defaults__ = ( False, # enable_message_ordering: False @@ -129,12 +129,11 @@ class LimitExceededBehavior(str, enum.Enum): ) PublisherOptions.retry.__doc__ = ( "Retry settings for message publishing by the client. This should be " - "an instance of api.core.retry.Retry." + "an instance of :class:`google.api_core.retry.Retry`." ) PublisherOptions.timeout.__doc__ = ( - "Timeout settings for message publishing by the client. This should be " - "an instance of api.core.timeout.ConstantTimeout or an instance of " - "api.core.timeout.ExponentialTimeout." + "Timeout settings for message publishing by the client. It should be compatible " + "with :class:`~.pubsub_v1.types.TimeoutType`." ) # Define the type class and default values for flow control settings. From 072d6363fd6253165bf312a91511627c0a064c33 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 27 May 2021 13:12:13 +0200 Subject: [PATCH 09/14] Use type alias for timeout in upgrading guide --- UPGRADING.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index 2238a0c8d..9ffdb5507 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -100,10 +100,7 @@ specified by the API producer. *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - google.api_core.timeout.ConstantTimeout, - google.api_core.timeout.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: google.pubsub_v1.types.TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: ``` From 44d94b8819e8a61ae67b6261d71a2fc68d5841cf Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 May 2021 18:45:00 +0200 Subject: [PATCH 10/14] Widen timeout types in generated publisher clients --- .../services/publisher/async_client.py | 118 +++++++++--------- google/pubsub_v1/services/publisher/client.py | 114 +++++++++-------- google/pubsub_v1/types/__init__.py | 12 +- synth.py | 44 ++++++- 4 files changed, 172 insertions(+), 116 deletions(-) diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index 98c81435a..f361180b9 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -33,7 +33,7 @@ from google.iam.v1 import policy_pb2 as policy # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub - +from google.pubsub_v1.types import TimeoutType from .transports.base import PublisherTransport, DEFAULT_CLIENT_INFO from .transports.grpc_asyncio import PublisherGrpcAsyncIOTransport from .client import PublisherClient @@ -178,9 +178,7 @@ async def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the [resource @@ -206,7 +204,7 @@ async def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -254,6 +252,9 @@ async def create_topic( gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -265,9 +266,7 @@ async def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -276,10 +275,9 @@ async def update_topic( Args: request (:class:`google.pubsub_v1.types.UpdateTopicRequest`): The request object. Request for the UpdateTopic method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -315,6 +313,9 @@ async def update_topic( ), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -328,9 +329,7 @@ async def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -352,10 +351,9 @@ async def publish( This corresponds to the ``messages`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -414,6 +412,9 @@ async def publish( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -426,9 +427,7 @@ async def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -443,10 +442,9 @@ async def get_topic( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -498,6 +496,9 @@ async def get_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -510,9 +511,7 @@ async def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsAsyncPager: r"""Lists matching topics. @@ -527,10 +526,9 @@ async def list_topics( This corresponds to the ``project`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -586,6 +584,9 @@ async def list_topics( gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -604,9 +605,7 @@ async def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsAsyncPager: r"""Lists the names of the attached subscriptions on this @@ -624,10 +623,9 @@ async def list_topic_subscriptions( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -683,6 +681,9 @@ async def list_topic_subscriptions( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -701,9 +702,7 @@ async def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsAsyncPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -725,10 +724,9 @@ async def list_topic_snapshots( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -784,6 +782,9 @@ async def list_topic_snapshots( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -802,9 +803,7 @@ async def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -825,10 +824,9 @@ async def delete_topic( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -872,6 +870,9 @@ async def delete_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. await rpc( request, retry=retry, timeout=timeout, metadata=metadata, @@ -882,9 +883,7 @@ async def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -897,10 +896,9 @@ async def detach_subscription( request (:class:`google.pubsub_v1.types.DetachSubscriptionRequest`): The request object. Request for the DetachSubscription method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -938,6 +936,9 @@ async def detach_subscription( ), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -949,9 +950,7 @@ async def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -962,7 +961,7 @@ async def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1047,6 +1046,9 @@ async def set_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1058,9 +1060,7 @@ async def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1072,7 +1072,7 @@ async def get_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1157,6 +1157,9 @@ async def get_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1168,9 +1171,7 @@ async def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control @@ -1182,7 +1183,7 @@ async def test_iam_permissions( `TestIamPermissions` method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1211,6 +1212,9 @@ async def test_iam_permissions( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 5d266215d..42fe18a4e 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -38,6 +38,7 @@ from google.iam.v1 import policy_pb2 as policy # type: ignore from google.pubsub_v1.services.publisher import pagers from google.pubsub_v1.types import pubsub +from google.pubsub_v1.types import TimeoutType import grpc @@ -402,9 +403,7 @@ def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the [resource @@ -431,7 +430,7 @@ def create_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -473,6 +472,9 @@ def create_topic( gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -484,9 +486,7 @@ def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -499,7 +499,7 @@ def update_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -529,6 +529,9 @@ def update_topic( ), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -542,9 +545,7 @@ def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -570,7 +571,7 @@ def publish( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -614,6 +615,9 @@ def publish( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -626,9 +630,7 @@ def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -647,7 +649,7 @@ def get_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -689,6 +691,9 @@ def get_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -701,9 +706,7 @@ def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: r"""Lists matching topics. @@ -722,7 +725,7 @@ def list_topics( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -768,6 +771,9 @@ def list_topics( gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -786,9 +792,7 @@ def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsPager: r"""Lists the names of the attached subscriptions on this @@ -807,10 +811,9 @@ def list_topic_subscriptions( This corresponds to the ``topic`` field on the ``request`` instance; if ``request`` is provided, this should not be set. - retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -856,6 +859,9 @@ def list_topic_subscriptions( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -874,9 +880,7 @@ def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -902,7 +906,7 @@ def list_topic_snapshots( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -948,6 +952,9 @@ def list_topic_snapshots( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -966,9 +973,7 @@ def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -993,7 +998,7 @@ def delete_topic( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1031,6 +1036,9 @@ def delete_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. rpc( request, retry=retry, timeout=timeout, metadata=metadata, @@ -1041,9 +1049,7 @@ def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -1060,7 +1066,7 @@ def detach_subscription( retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1092,6 +1098,9 @@ def detach_subscription( ), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1103,9 +1112,7 @@ def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -1117,7 +1124,7 @@ def set_iam_policy( method. retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1205,6 +1212,9 @@ def set_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1216,9 +1226,7 @@ def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1229,9 +1237,7 @@ def get_iam_policy( request (:class:`~.iam_policy.GetIamPolicyRequest`): The request object. Request message for `GetIamPolicy` method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, - should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1319,6 +1325,9 @@ def get_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1330,9 +1339,7 @@ def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: Union[ - timeouts.ConstantTimeout, timeouts.ExponentialTimeout - ] = gapic_v1.method.DEFAULT, + timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control @@ -1343,9 +1350,7 @@ def test_iam_permissions( request (:class:`~.iam_policy.TestIamPermissionsRequest`): The request object. Request message for `TestIamPermissions` method. - retry (google.api_core.retry.Retry): Designation of what errors, if any, - should be retried. - timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]): + timeout (TimeoutType): The timeout for this request. metadata (Sequence[Tuple[str, str]]): Strings which should be sent along with the request as metadata. @@ -1374,6 +1379,9 @@ def test_iam_permissions( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) + if timeout is None or isinstance(timeout, (int, float)): + timeout = timeouts.ConstantTimeout(timeout) + # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) diff --git a/google/pubsub_v1/types/__init__.py b/google/pubsub_v1/types/__init__.py index 2894f6668..a17b89168 100644 --- a/google/pubsub_v1/types/__init__.py +++ b/google/pubsub_v1/types/__init__.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import Union from .pubsub import ( AcknowledgeRequest, @@ -76,7 +76,17 @@ SchemaView, ) +TimeoutType = Union[ + None, + int, + float, + "google.api_core.timeout.ConstantTimeout", + "google.api_core.timeout.ExponentialTimeout", +] +"""The type of the timeout parameter of publisher client methods.""" + __all__ = ( + "TimeoutType", "AcknowledgeRequest", "CreateSnapshotRequest", "DeadLetterPolicy", diff --git a/synth.py b/synth.py index 27a783f92..d8823661f 100644 --- a/synth.py +++ b/synth.py @@ -249,27 +249,61 @@ ) # Allow timeout to be an instance of google.api_core.timeout.* +s.replace( + "google/pubsub_v1/types/__init__.py", + r"from \.pubsub import \(", + "from typing import Union\n\n\g<0>" +) +s.replace( + "google/pubsub_v1/types/__init__.py", + r"__all__ = \(\n", + textwrap.dedent('''\ + TimeoutType = Union[ + None, + int, + float, + "google.api_core.timeout.ConstantTimeout", + "google.api_core.timeout.ExponentialTimeout", + ] + """The type of the timeout parameter of publisher client methods.""" + + \g<0> "TimeoutType",''') +) + s.replace( "google/pubsub_v1/services/publisher/*client.py", r"from google.api_core import retry as retries.*\n", "\g<0>from google.api_core import timeout as timeouts # type: ignore\n" ) +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"from google\.pubsub_v1\.types import pubsub", + "\g<0>\nfrom google.pubsub_v1.types import TimeoutType", +) + s.replace( "google/pubsub_v1/services/publisher/*client.py", r"(\s+)timeout: float = None.*\n", - """ - \g<1>timeout: Union[ - \g<1> timeouts.ConstantTimeout, timeouts.ExponentialTimeout - \g<1>] = gapic_v1.method.DEFAULT,""", + "\g<1>timeout: TimeoutType = gapic_v1.method.DEFAULT,", ) s.replace( "google/pubsub_v1/services/publisher/*client.py", r"([^\S\r\n]+)timeout \(float\): (.*)\n", ( - "\g<1>timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):\n" + "\g<1>timeout (TimeoutType):\n" "\g<1> \g<2>\n" ), ) +s.replace( + "google/pubsub_v1/services/publisher/*client.py", + r"([^\S\r\n]+)# Send the request\.\n", + textwrap.dedent("""\ + \g<1>if timeout is None or isinstance(timeout, (int, float)): + \g<1> timeout = timeouts.ConstantTimeout(timeout) + + \g<0>""" + ), +) # The namespace package declaration in google/cloud/__init__.py should be excluded # from coverage. From d1f0294a678bed90a1ec17bd7a90b8f38ce856ab Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 May 2021 19:28:37 +0200 Subject: [PATCH 11/14] Use TimeoutType in handwritten code --- .../cloud/pubsub_v1/publisher/_batch/thread.py | 7 ++----- .../pubsub_v1/publisher/_sequencer/base.py | 11 ++++++----- .../publisher/_sequencer/ordered_sequencer.py | 18 ++++++++---------- .../_sequencer/unordered_sequencer.py | 18 ++++++++---------- google/cloud/pubsub_v1/publisher/client.py | 10 +++------- google/cloud/pubsub_v1/types.py | 13 ------------- .../pubsub_v1/publisher/batch/test_thread.py | 7 ++----- 7 files changed, 29 insertions(+), 55 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 2ed8364ad..a993af970 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -73,10 +73,7 @@ class Batch(base.Batch): commit_retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried when commiting the batch. If not provided, a default retry is used. - commit_timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when commiting the batch. If not provided, a default timeout is used. """ @@ -89,7 +86,7 @@ def __init__( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): self._client = client self._topic = topic diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/base.py b/google/cloud/pubsub_v1/publisher/_sequencer/base.py index 16c00cb8a..cb29e9a66 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/base.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/base.py @@ -16,6 +16,8 @@ import abc +from google.pubsub_v1 import types as gapic_types + class Sequencer(metaclass=abc.ABCMeta): """The base class for sequencers for Pub/Sub publishing. A sequencer @@ -45,7 +47,9 @@ def unpause(self, message): # pragma: NO COVER @staticmethod @abc.abstractmethod - def publish(self, message, retry=None, timeout=None): # pragma: NO COVER + def publish( + self, message, retry=None, timeout: gapic_types.TimeoutType = None + ): # pragma: NO COVER """ Publish message for this ordering key. Args: @@ -53,10 +57,7 @@ def publish(self, message, retry=None, timeout=None): # pragma: NO COVER The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. - timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when publishing the message. Returns: diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 1848d676d..83dd0c921 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -21,6 +21,7 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base from google.cloud.pubsub_v1.publisher._batch import base as batch_base +from google.pubsub_v1 import types as gapic_types class _OrderedSequencerStatus(str, enum.Enum): @@ -229,7 +230,7 @@ def unpause(self): def _create_batch( self, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): """ Create a new batch using the client's batch class and other stored settings. @@ -237,10 +238,7 @@ def _create_batch( Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. - commit_timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when publishing the batch. """ return self._client._batch_class( @@ -254,7 +252,10 @@ def _create_batch( ) def publish( - self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + self, + message, + retry=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): """ Publish message for this ordering key. @@ -263,10 +264,7 @@ def publish( The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. - timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when publishing the message. Returns: diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 627f1177d..76dd1cad7 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -15,6 +15,7 @@ from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base +from google.pubsub_v1 import types as gapic_types class UnorderedSequencer(base.Sequencer): @@ -80,7 +81,7 @@ def unpause(self): def _create_batch( self, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): """ Create a new batch using the client's batch class and other stored settings. @@ -88,10 +89,7 @@ def _create_batch( Args: commit_retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the batch. - commit_timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when publishing the batch. """ return self._client._batch_class( @@ -105,7 +103,10 @@ def _create_batch( ) def publish( - self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + self, + message, + retry=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): """ Batch message into existing or new batch. @@ -114,10 +115,7 @@ def publish( The Pub/Sub message. retry (Optional[google.api_core.retry.Retry]): The retry settings to apply when publishing the message. - timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply when publishing the message. Returns: diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 779b67f61..c47e65e59 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -235,7 +235,7 @@ def publish( data, ordering_key="", retry=gapic_v1.method.DEFAULT, - timeout=gapic_v1.method.DEFAULT, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, **attrs ): """Publish a single message. @@ -273,15 +273,11 @@ def publish( publish order should be respected. Message ordering must be enabled for this client to use this feature. retry (Optional[google.api_core.retry.Retry]): Designation of what - errors, if any, should be retried. If `ordering_key` is specified, + errors, if any, should be retried. If `ordering_key` is specified,\ the total retry deadline will be changed to "infinity". If given, it overides any retry passed into the client through the ``publisher_options`` argument. - - timeout (Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]): + timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout for the RPC request. Can be used to override any timeout passed in through ``publisher_options`` when instantiating the client. diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 03da716e0..6ae71130a 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -18,12 +18,10 @@ import enum import inspect import sys -from typing import Union import proto from google.api import http_pb2 -import google.api_core from google.api_core import gapic_v1 from google.iam.v1 import iam_policy_pb2 from google.iam.v1 import policy_pb2 @@ -39,16 +37,6 @@ from google.pubsub_v1.types import pubsub as pubsub_gapic_types -TimeoutType = Union[ - None, - int, - float, - google.api_core.timeout.ConstantTimeout, - google.api_core.timeout.ExponentialTimeout, -] -"""The type of the timeout parameter of publisher client methods.""" - - # Define the default values for batching. # # This class is used when creating a publisher or subscriber client, and @@ -217,7 +205,6 @@ def _get_protobuf_messages(module): _local_modules = [pubsub_gapic_types] names = [ - "TimeoutType", "BatchSettings", "LimitExceededBehavior", "PublishFlowControl", diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index e9ff984e8..abf5ec76f 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -42,7 +42,7 @@ def create_batch( batch_done_callback=None, commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, - commit_timeout=gapic_v1.method.DEFAULT, + commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, **batch_settings ): """Return a batch object suitable for testing. @@ -55,10 +55,7 @@ def create_batch( has reached byte-size or number-of-messages limits. commit_retry (Optional[google.api_core.retry.Retry]): The retry settings for the batch commit call. - commit_timeout (Optional[Union[ \ - googole.api_core.timeout.ConstantTimeout, \ - googole.api_core.timeout.ExponentialTimeout \ - ]]): + commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply to the batch commit call. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. From 05b556e8f07f0e2dd8d5c6c22cf7f445c8df121b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 May 2021 19:55:28 +0200 Subject: [PATCH 12/14] Remove redundant backslash --- google/cloud/pubsub_v1/publisher/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index c47e65e59..cf7e47668 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -273,7 +273,7 @@ def publish( publish order should be respected. Message ordering must be enabled for this client to use this feature. retry (Optional[google.api_core.retry.Retry]): Designation of what - errors, if any, should be retried. If `ordering_key` is specified,\ + errors, if any, should be retried. If `ordering_key` is specified, the total retry deadline will be changed to "infinity". If given, it overides any retry passed into the client through the ``publisher_options`` argument. From 762b129f4511750802b552e1b1efac8c621cc38a Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 May 2021 21:32:37 +0200 Subject: [PATCH 13/14] Use DEFAULT as a defualt timeout in base sequencer --- google/cloud/pubsub_v1/publisher/_sequencer/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/base.py b/google/cloud/pubsub_v1/publisher/_sequencer/base.py index cb29e9a66..60a7d269c 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/base.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/base.py @@ -16,6 +16,7 @@ import abc +from google.api_core import gapic_v1 from google.pubsub_v1 import types as gapic_types @@ -48,7 +49,10 @@ def unpause(self, message): # pragma: NO COVER @staticmethod @abc.abstractmethod def publish( - self, message, retry=None, timeout: gapic_types.TimeoutType = None + self, + message, + retry=None, + timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, ): # pragma: NO COVER """ Publish message for this ordering key. From b7939d1fd7e8fe65d8b8e5d5558facfc2ab1542f Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 28 May 2021 21:47:28 +0200 Subject: [PATCH 14/14] Do not accept plain None as a valid timeout Using no timeout is not a good idea, but if one really wants to, they can pass it in as ConstantTimeout(None). As a side effect, the logic of converting a constant into a COnstantTimeout instance can be removed, as this is already handled in api-core for int and float values. --- .../services/publisher/async_client.py | 36 ------------------- google/pubsub_v1/services/publisher/client.py | 36 ------------------- google/pubsub_v1/types/__init__.py | 1 - synth.py | 11 ------ 4 files changed, 84 deletions(-) diff --git a/google/pubsub_v1/services/publisher/async_client.py b/google/pubsub_v1/services/publisher/async_client.py index f361180b9..041391c57 100644 --- a/google/pubsub_v1/services/publisher/async_client.py +++ b/google/pubsub_v1/services/publisher/async_client.py @@ -252,9 +252,6 @@ async def create_topic( gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -313,9 +310,6 @@ async def update_topic( ), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -412,9 +406,6 @@ async def publish( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -496,9 +487,6 @@ async def get_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -584,9 +572,6 @@ async def list_topics( gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -681,9 +666,6 @@ async def list_topic_subscriptions( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -782,9 +764,6 @@ async def list_topic_snapshots( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -870,9 +849,6 @@ async def delete_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. await rpc( request, retry=retry, timeout=timeout, metadata=metadata, @@ -936,9 +912,6 @@ async def detach_subscription( ), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1046,9 +1019,6 @@ async def set_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1157,9 +1127,6 @@ async def get_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1212,9 +1179,6 @@ async def test_iam_permissions( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 42fe18a4e..3f249b01b 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -472,9 +472,6 @@ def create_topic( gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -529,9 +526,6 @@ def update_topic( ), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -615,9 +609,6 @@ def publish( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -691,9 +682,6 @@ def get_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -771,9 +759,6 @@ def list_topics( gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -859,9 +844,6 @@ def list_topic_subscriptions( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -952,9 +934,6 @@ def list_topic_snapshots( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1036,9 +1015,6 @@ def delete_topic( gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. rpc( request, retry=retry, timeout=timeout, metadata=metadata, @@ -1098,9 +1074,6 @@ def detach_subscription( ), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1212,9 +1185,6 @@ def set_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1325,9 +1295,6 @@ def get_iam_policy( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) @@ -1379,9 +1346,6 @@ def test_iam_permissions( gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)), ) - if timeout is None or isinstance(timeout, (int, float)): - timeout = timeouts.ConstantTimeout(timeout) - # Send the request. response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) diff --git a/google/pubsub_v1/types/__init__.py b/google/pubsub_v1/types/__init__.py index a17b89168..ebc8b5399 100644 --- a/google/pubsub_v1/types/__init__.py +++ b/google/pubsub_v1/types/__init__.py @@ -77,7 +77,6 @@ ) TimeoutType = Union[ - None, int, float, "google.api_core.timeout.ConstantTimeout", diff --git a/synth.py b/synth.py index d8823661f..41b63e89e 100644 --- a/synth.py +++ b/synth.py @@ -259,7 +259,6 @@ r"__all__ = \(\n", textwrap.dedent('''\ TimeoutType = Union[ - None, int, float, "google.api_core.timeout.ConstantTimeout", @@ -294,16 +293,6 @@ "\g<1> \g<2>\n" ), ) -s.replace( - "google/pubsub_v1/services/publisher/*client.py", - r"([^\S\r\n]+)# Send the request\.\n", - textwrap.dedent("""\ - \g<1>if timeout is None or isinstance(timeout, (int, float)): - \g<1> timeout = timeouts.ConstantTimeout(timeout) - - \g<0>""" - ), -) # The namespace package declaration in google/cloud/__init__.py should be excluded # from coverage.