From 729a5e7e856a30cc81b978248f3110509a01206c Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Mon, 16 Nov 2020 01:02:11 -0600 Subject: [PATCH 1/2] feat: allow retry and timeout settings on publisher client --- google/cloud/pubsub_v1/publisher/client.py | 8 ++- google/cloud/pubsub_v1/types.py | 14 +++++- google/pubsub_v1/services/publisher/client.py | 49 ++++++++++++++----- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 6a9418e69..365844cd1 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -272,7 +272,13 @@ def resume_publish(self, topic, ordering_key): sequencer.unpause() def publish( - self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs + self, + topic, + data, + ordering_key="", + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + **attrs ): """Publish a single message. diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index b875f3cd2..573897cc5 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -22,6 +22,7 @@ import proto from google.api import http_pb2 +from google.api_core import gapic_v1 from google.iam.v1 import iam_policy_pb2 from google.iam.v1 import policy_pb2 from google.iam.v1.logging import audit_data_pb2 @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum): # This class is used when creating a publisher client to pass in options # to enable/disable features. PublisherOptions = collections.namedtuple( - "PublisherConfig", ["enable_message_ordering", "flow_control"] + "PublisherConfig", ["enable_message_ordering", "flow_control", "retry", "timeout"] ) PublisherOptions.__new__.__defaults__ = ( False, # enable_message_ordering: False PublishFlowControl(), # default flow control settings + gapic_v1.method.DEFAULT, # use default api_core value for retry + gapic_v1.method.DEFAULT, # use default api_core value for timeout ) PublisherOptions.__doc__ = "The options for the publisher client." PublisherOptions.enable_message_ordering.__doc__ = ( @@ -115,6 +118,15 @@ class LimitExceededBehavior(str, enum.Enum): "Flow control settings for message publishing by the client. By default " "the publisher client does not do any throttling." ) +PublisherOptions.retry.__doc__ = ( + "Retry settings for message publishing by the client. This should be " + "an instance of api.core.retry.Retry." +) +PublisherOptions.timeout.__doc__ = ( + "Timeout settings for message publishing by the client. This should be " + "an instance of api.core.timeout.ConstantTimeout or an instance of " + "api.core.timeout.ExponentialTimeout." +) # Define the type class and default values for flow control settings. # diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 188b3dccb..f618103f4 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -26,6 +26,7 @@ from google.api_core import exceptions # type: ignore from google.api_core import gapic_v1 # type: ignore from google.api_core import retry as retries # type: ignore +from google.api_core import timeout as timeouts # type: ignore from google.auth import credentials # type: ignore from google.auth.transport import mtls # type: ignore from google.auth.transport.grpc import SslCredentials # type: ignore @@ -363,7 +364,9 @@ def create_topic( *, name: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Creates the given topic with the given name. See the `resource @@ -441,7 +444,9 @@ def update_topic( request: pubsub.UpdateTopicRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Updates an existing topic. Note that certain @@ -496,7 +501,9 @@ def publish( topic: str = None, messages: Sequence[pubsub.PubsubMessage] = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -577,7 +584,9 @@ def get_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.Topic: r"""Gets the configuration of a topic. @@ -648,7 +657,9 @@ def list_topics( *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: r"""Lists matching topics. @@ -729,7 +740,9 @@ def list_topic_subscriptions( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSubscriptionsPager: r"""Lists the names of the attached subscriptions on this @@ -813,7 +826,9 @@ def list_topic_snapshots( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicSnapshotsPager: r"""Lists the names of the snapshots on this topic. Snapshots are @@ -901,7 +916,9 @@ def delete_topic( *, topic: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> None: r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if @@ -972,7 +989,9 @@ def detach_subscription( request: pubsub.DetachSubscriptionRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.DetachSubscriptionResponse: r"""Detaches a subscription from this topic. All messages retained @@ -1031,7 +1050,9 @@ def set_iam_policy( request: iam_policy.SetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Sets the IAM access control policy on the specified @@ -1141,7 +1162,9 @@ def get_iam_policy( request: iam_policy.GetIamPolicyRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> policy.Policy: r"""Gets the IAM access control policy for a function. @@ -1252,7 +1275,9 @@ def test_iam_permissions( request: iam_policy.TestIamPermissionsRequest = None, *, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + timeouts.ConstantTimeout, timeouts.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> iam_policy.TestIamPermissionsResponse: r"""Tests the specified permissions against the IAM access control From a9b5190f340404232050fa1ac8416e8279dc7c7d Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Mon, 16 Nov 2020 03:23:16 -0600 Subject: [PATCH 2/2] build: update generated code and update doc --- UPGRADING.md | 46 +++++++++++++++++++++++++++++++++++++++++++++- synth.py | 15 +++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index 3837464fc..2238a0c8d 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -100,7 +100,10 @@ specified by the API producer. *, project: str = None, retry: retries.Retry = gapic_v1.method.DEFAULT, - timeout: float = None, + timeout: Union[ + google.api_core.timeout.ConstantTimeout, + google.api_core.timeout.ExponentialTimeout + ] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), ) -> pagers.ListTopicsPager: ``` @@ -161,3 +164,44 @@ The publisher and subscriber clients cannot be constructed with `client_config` argument anymore. If you want to customize retry and timeout settings for a particular method, you need to do it upon method invocation by passing the custom `timeout` and `retry` arguments, respectively. + + +## Custom Retry and Timeout settings for Publisher Client + +The ``publisher_options`` parameter to the Publisher Client, as well as all of the +client's methods, now accept custom retry and timeout settings: + +```py +custom_retry = api_core.retry.Retry( + initial=0.250, # seconds (default: 0.1) + maximum=90.0, # seconds (default: 60.0) + multiplier=1.45, # default: 1.3 + deadline=300.0, # seconds (default: 60.0) + predicate=api_core.retry.if_exception_type( + api_core.exceptions.Aborted, + api_core.exceptions.DeadlineExceeded, + api_core.exceptions.InternalServerError, + api_core.exceptions.ResourceExhausted, + api_core.exceptions.ServiceUnavailable, + api_core.exceptions.Unknown, + api_core.exceptions.Cancelled, + ), +) + +custom_timeout=api_core.timeout.ExponentialTimeout( + initial=1.0, + maximum=10.0, + multiplier=1.0, + deadline=300.0, +) + +publisher = pubsub_v1.PublisherClient( + publisher_options = pubsub_v1.types.PublisherOptions( + retry=custom_retry, + timeout=custom_timeout, + ), +) +``` + +The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`, +or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example. diff --git a/synth.py b/synth.py index fe1b0838e..cf6b8ed72 100644 --- a/synth.py +++ b/synth.py @@ -104,6 +104,21 @@ "\n\g<0>", ) +# Allow timeout to be an instance of google.api_core.timeout.* +s.replace( + "google/pubsub_v1/services/publisher/client.py", + r"from google.api_core import retry as retries.*\n", + "\g<0>from google.api_core import timeout as timeouts # type: ignore\n" +) +s.replace( + "google/pubsub_v1/services/publisher/client.py", + r"(\s+)timeout: float = None.*\n", + """ + \g<1>timeout: Union[ + \g<1> timeouts.ConstantTimeout, timeouts.ExponentialTimeout + \g<1>] = gapic_v1.method.DEFAULT,""", +) + # ---------------------------------------------------------------------------- # Add templated files # ----------------------------------------------------------------------------