Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support customizable retry and timeout settings on the publisher client #299

Merged
merged 19 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 42 additions & 1 deletion UPGRADING.md
Expand Up @@ -100,7 +100,7 @@ specified by the API producer.
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: google.pubsub_v1.types.TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsPager:
```
Expand Down Expand Up @@ -161,3 +161,44 @@ The publisher and subscriber clients cannot be constructed with `client_config`
argument anymore. If you want to customize retry and timeout settings for a particular
method, you need to do it upon method invocation by passing the custom `timeout` and
`retry` arguments, respectively.


## Custom Retry and Timeout settings for Publisher Client

The ``publisher_options`` parameter to the Publisher Client, as well as all of the
client's methods, now accept custom retry and timeout settings:

```py
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)

custom_timeout=api_core.timeout.ExponentialTimeout(
initial=1.0,
maximum=10.0,
multiplier=1.0,
deadline=300.0,
)

publisher = pubsub_v1.PublisherClient(
publisher_options = pubsub_v1.types.PublisherOptions(
retry=custom_retry,
timeout=custom_timeout,
),
)
```

The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`,
or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example.
13 changes: 12 additions & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -73,6 +73,12 @@ class Batch(base.Batch):
commit_retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried when commiting the batch. If not
provided, a default retry is used.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when commiting the batch. If not provided, a
default timeout is used.
"""

def __init__(
Expand All @@ -83,6 +89,7 @@ def __init__(
batch_done_callback=None,
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand All @@ -106,6 +113,7 @@ def __init__(
self._size = self._base_request_size

self._commit_retry = commit_retry
self._commit_timeout = commit_timeout

@staticmethod
def make_lock():
Expand Down Expand Up @@ -259,7 +267,10 @@ def _commit(self):
try:
# Performs retries for errors defined by the retry configuration.
response = self._client.api.publish(
topic=self._topic, messages=self._messages, retry=self._commit_retry
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
timeout=self._commit_timeout,
)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
Expand Down
7 changes: 6 additions & 1 deletion google/cloud/pubsub_v1/publisher/_sequencer/base.py
Expand Up @@ -45,14 +45,19 @@ def unpause(self, message): # pragma: NO COVER

@staticmethod
@abc.abstractmethod
def publish(self, message, retry=None): # pragma: NO COVER
def publish(self, message, retry=None, timeout=None): # pragma: NO COVER
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when publishing the message.

Returns:
A class instance that conforms to Python Standard library's
Expand Down
27 changes: 23 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Expand Up @@ -226,13 +226,22 @@ def unpause(self):
raise RuntimeError("Ordering key is not paused.")
self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -241,16 +250,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=self._batch_done_callback,
commit_when_full=False,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
):
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when publishing the message.

Returns:
A class instance that conforms to Python Standard library's
Expand Down Expand Up @@ -287,13 +304,15 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
), "Publish is only allowed in accepting-messages state."

if not self._ordered_batches:
new_batch = self._create_batch(commit_retry=retry)
new_batch = self._create_batch(
commit_retry=retry, commit_timeout=timeout
)
self._ordered_batches.append(new_batch)

batch = self._ordered_batches[-1]
future = batch.publish(message)
while future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(message)

Expand Down
25 changes: 21 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py
Expand Up @@ -77,13 +77,22 @@ def unpause(self):
""" Not relevant for this class. """
raise NotImplementedError

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -92,16 +101,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=None,
commit_when_full=True,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
):
""" Batch message into existing or new batch.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout to apply when publishing the message.

Returns:
~google.api_core.future.Future: An object conforming to
Expand All @@ -119,7 +136,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
raise RuntimeError("Unordered sequencer already stopped.")

if not self._current_batch:
newbatch = self._create_batch(commit_retry=retry)
newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._current_batch = newbatch

batch = self._current_batch
Expand All @@ -129,7 +146,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
future = batch.publish(message)
# batch is full, triggering commit_when_full
if future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
# At this point, we lose track of the old batch, but we don't
# care since it's already committed (because it was full.)
self._current_batch = batch
Expand Down
26 changes: 24 additions & 2 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -230,7 +230,13 @@ def resume_publish(self, topic, ordering_key):
sequencer.unpause()

def publish(
self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs
self,
topic,
data,
ordering_key="",
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
plamut marked this conversation as resolved.
Show resolved Hide resolved
**attrs
):
"""Publish a single message.

Expand Down Expand Up @@ -269,6 +275,16 @@ def publish(
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. If `ordering_key` is specified,
the total retry deadline will be changed to "infinity".
If given, it overides any retry passed into the client through
the ``publisher_options`` argument.

timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
The timeout for the RPC request. Can be used to override any timeout
passed in through ``publisher_options`` when instantiating the client.

attrs (Mapping[str, str]): A dictionary of attributes to be
sent as metadata. (These may be text strings or byte strings.)

Expand Down Expand Up @@ -331,6 +347,12 @@ def publish(
def on_publish_done(future):
self._flow_controller.release(message)

if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in
retry = self.publisher_options.retry

if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
timeout = self.publisher_options.timeout

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
Expand All @@ -347,7 +369,7 @@ def on_publish_done(future):

# Delegate the publishing to the sequencer.
sequencer = self._get_or_create_sequencer(topic, ordering_key)
future = sequencer.publish(message, retry=retry)
future = sequencer.publish(message, retry=retry, timeout=timeout)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
Expand Down
26 changes: 25 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -18,10 +18,13 @@
import enum
import inspect
import sys
from typing import Union

import proto

from google.api import http_pb2
import google.api_core
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import policy_pb2
from google.iam.v1.logging import audit_data_pb2
Expand All @@ -36,6 +39,16 @@
from google.pubsub_v1.types import pubsub as pubsub_gapic_types


TimeoutType = Union[
None,
Copy link
Contributor

@jimfulton jimfulton May 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we mean to add None? Was this my fault?

This feels like an unintentional change. We didn't accept None before.

If we want to accept None, then I think we should update

https://github.com/googleapis/python-api-core/blob/7337c6b123735fb9ae5d0e54b4399275719c020c/google/api_core/gapic_v1/method.py#L66-L67

to treat None like DEFAULT.

Copy link
Contributor Author

@plamut plamut May 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we did, even though the argument description says otherwise. But the generated code actually used None as a default value for a supposedly float-only argument. :)

int,
float,
google.api_core.timeout.ConstantTimeout,
google.api_core.timeout.ExponentialTimeout,
]
"""The type of the timeout parameter of publisher client methods."""


# Define the default values for batching.
#
# This class is used when creating a publisher or subscriber client, and
Expand Down Expand Up @@ -98,11 +111,13 @@ class LimitExceededBehavior(str, enum.Enum):
# This class is used when creating a publisher client to pass in options
# to enable/disable features.
PublisherOptions = collections.namedtuple(
"PublisherConfig", ["enable_message_ordering", "flow_control"]
"PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"]
)
PublisherOptions.__new__.__defaults__ = (
False, # enable_message_ordering: False
PublishFlowControl(), # default flow control settings
gapic_v1.method.DEFAULT, # use default api_core value for retry
gapic_v1.method.DEFAULT, # use default api_core value for timeout
)
PublisherOptions.__doc__ = "The options for the publisher client."
PublisherOptions.enable_message_ordering.__doc__ = (
Expand All @@ -112,6 +127,14 @@ class LimitExceededBehavior(str, enum.Enum):
"Flow control settings for message publishing by the client. By default "
"the publisher client does not do any throttling."
)
PublisherOptions.retry.__doc__ = (
"Retry settings for message publishing by the client. This should be "
"an instance of :class:`google.api_core.retry.Retry`."
)
PublisherOptions.timeout.__doc__ = (
"Timeout settings for message publishing by the client. It should be compatible "
"with :class:`~.pubsub_v1.types.TimeoutType`."
)

# Define the type class and default values for flow control settings.
#
Expand Down Expand Up @@ -194,6 +217,7 @@ def _get_protobuf_messages(module):
_local_modules = [pubsub_gapic_types]

names = [
"TimeoutType",
"BatchSettings",
"LimitExceededBehavior",
"PublishFlowControl",
Expand Down