Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Allow retry and timeout settings on the publisher client #239

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 45 additions & 1 deletion UPGRADING.md
Expand Up @@ -100,7 +100,10 @@ specified by the API producer.
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
google.api_core.timeout.ConstantTimeout,
google.api_core.timeout.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsPager:
```
Expand Down Expand Up @@ -161,3 +164,44 @@ The publisher and subscriber clients cannot be constructed with `client_config`
argument anymore. If you want to customize retry and timeout settings for a particular
method, you need to do it upon method invocation by passing the custom `timeout` and
`retry` arguments, respectively.


## Custom Retry and Timeout settings for Publisher Client

The ``publisher_options`` parameter to the Publisher Client, as well as all of the
client's methods, now accept custom retry and timeout settings:

```py
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)

custom_timeout=api_core.timeout.ExponentialTimeout(
initial=1.0,
maximum=10.0,
multiplier=1.0,
deadline=300.0,
)

publisher = pubsub_v1.PublisherClient(
publisher_options = pubsub_v1.types.PublisherOptions(
retry=custom_retry,
timeout=custom_timeout,
),
)
```

The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`,
or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example.
8 changes: 7 additions & 1 deletion google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -272,7 +272,13 @@ def resume_publish(self, topic, ordering_key):
sequencer.unpause()

def publish(
self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs
self,
topic,
data,
ordering_key="",
retry=gapic_v1.method.DEFAULT,
Copy link
Contributor

@pradn pradn Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If retry and timeout are added to PublisherOptions, then there's no need to add these parameters here, yes? The user passes them in using the **attrs, like here:

publisher_options = pubsub_v1.types.PublisherOptions(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somebody might want to override these default options for a few particular publish() calls?

I'd say we keep these two arguments for flexibility, as well as for consistency with other Publisher methods. Besides, the publish() method signature already includes the retry parameter and thus shouldn't be removed, and it makes more sense to retain timeout with it.

timeout=gapic_v1.method.DEFAULT,
**attrs
):
"""Publish a single message.

Expand Down
14 changes: 13 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -22,6 +22,7 @@
import proto

from google.api import http_pb2
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import policy_pb2
from google.iam.v1.logging import audit_data_pb2
Expand Down Expand Up @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum):
# This class is used when creating a publisher client to pass in options
# to enable/disable features.
PublisherOptions = collections.namedtuple(
"PublisherConfig", ["enable_message_ordering", "flow_control"]
"PublisherConfig", ["enable_message_ordering", "flow_control", "retry", "timeout"]
)
PublisherOptions.__new__.__defaults__ = (
False, # enable_message_ordering: False
PublishFlowControl(), # default flow control settings
gapic_v1.method.DEFAULT, # use default api_core value for retry
gapic_v1.method.DEFAULT, # use default api_core value for timeout
)
PublisherOptions.__doc__ = "The options for the publisher client."
PublisherOptions.enable_message_ordering.__doc__ = (
Expand All @@ -115,6 +118,15 @@ class LimitExceededBehavior(str, enum.Enum):
"Flow control settings for message publishing by the client. By default "
"the publisher client does not do any throttling."
)
PublisherOptions.retry.__doc__ = (
"Retry settings for message publishing by the client. This should be "
"an instance of api.core.retry.Retry."
)
PublisherOptions.timeout.__doc__ = (
"Timeout settings for message publishing by the client. This should be "
"an instance of api.core.timeout.ConstantTimeout or an instance of "
"api.core.timeout.ExponentialTimeout."
)

# Define the type class and default values for flow control settings.
#
Expand Down
49 changes: 37 additions & 12 deletions google/pubsub_v1/services/publisher/client.py
Expand Up @@ -26,6 +26,7 @@
from google.api_core import exceptions # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.api_core import timeout as timeouts # type: ignore
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a generated file, but I included it in this PR so that it can be seen what the synth changes will do to it.

from google.auth import credentials # type: ignore
from google.auth.transport import mtls # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
Expand Down Expand Up @@ -363,7 +364,9 @@ def create_topic(
*,
name: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Creates the given topic with the given name. See the `resource
Expand Down Expand Up @@ -441,7 +444,9 @@ def update_topic(
request: pubsub.UpdateTopicRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Updates an existing topic. Note that certain
Expand Down Expand Up @@ -496,7 +501,9 @@ def publish(
topic: str = None,
messages: Sequence[pubsub.PubsubMessage] = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.PublishResponse:
r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
Expand Down Expand Up @@ -577,7 +584,9 @@ def get_topic(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Gets the configuration of a topic.
Expand Down Expand Up @@ -648,7 +657,9 @@ def list_topics(
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsPager:
r"""Lists matching topics.
Expand Down Expand Up @@ -729,7 +740,9 @@ def list_topic_subscriptions(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicSubscriptionsPager:
r"""Lists the names of the attached subscriptions on this
Expand Down Expand Up @@ -813,7 +826,9 @@ def list_topic_snapshots(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicSnapshotsPager:
r"""Lists the names of the snapshots on this topic. Snapshots are
Expand Down Expand Up @@ -901,7 +916,9 @@ def delete_topic(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> None:
r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if
Expand Down Expand Up @@ -972,7 +989,9 @@ def detach_subscription(
request: pubsub.DetachSubscriptionRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.DetachSubscriptionResponse:
r"""Detaches a subscription from this topic. All messages retained
Expand Down Expand Up @@ -1031,7 +1050,9 @@ def set_iam_policy(
request: iam_policy.SetIamPolicyRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> policy.Policy:
r"""Sets the IAM access control policy on the specified
Expand Down Expand Up @@ -1141,7 +1162,9 @@ def get_iam_policy(
request: iam_policy.GetIamPolicyRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> policy.Policy:
r"""Gets the IAM access control policy for a function.
Expand Down Expand Up @@ -1252,7 +1275,9 @@ def test_iam_permissions(
request: iam_policy.TestIamPermissionsRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> iam_policy.TestIamPermissionsResponse:
r"""Tests the specified permissions against the IAM access control
Expand Down
15 changes: 15 additions & 0 deletions synth.py
Expand Up @@ -104,6 +104,21 @@
"\n\g<0>",
)

# Allow timeout to be an instance of google.api_core.timeout.*
s.replace(
"google/pubsub_v1/services/publisher/client.py",
r"from google.api_core import retry as retries.*\n",
"\g<0>from google.api_core import timeout as timeouts # type: ignore\n"
)
s.replace(
"google/pubsub_v1/services/publisher/client.py",
r"(\s+)timeout: float = None.*\n",
"""
\g<1>timeout: Union[
\g<1> timeouts.ConstantTimeout, timeouts.ExponentialTimeout
\g<1>] = gapic_v1.method.DEFAULT,""",
)

# ----------------------------------------------------------------------------
# Add templated files
# ----------------------------------------------------------------------------
Expand Down