Skip to content

Commit

Permalink
feat: support customizable retry and timeout settings on the publishe…
Browse files Browse the repository at this point in the history
…r client (#299)

* feat: allow retry and timeout settings on publisher client

* build: update generated code and update doc

* Propagate publish timeout to the RPC call

* test publisher client

* Fix timeout parameter type in docstrings

* Apply timeout changes to async publisher client

* Introduce TimeoutType type alias

* Update PublisherOptions docs

* Use type alias for timeout in upgrading guide

* Widen timeout types in generated publisher clients

* Use TimeoutType in handwritten code

* Remove redundant backslash

* Use DEFAULT as a defualt timeout in base sequencer

* Do not accept plain None as a valid timeout

Using no timeout is not a good idea, but if one really wants to,
they can pass it in as ConstantTimeout(None).

As a side effect, the logic of converting a constant into a
COnstantTimeout instance can be removed, as this is already handled
in api-core for int and float values.

Co-authored-by: Carlos de la Guardia <cguardia@yahoo.com>
  • Loading branch information
plamut and cguardia committed Jun 15, 2021
1 parent 691a3dd commit 7597604
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 78 deletions.
43 changes: 42 additions & 1 deletion UPGRADING.md
Expand Up @@ -100,7 +100,7 @@ specified by the API producer.
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: google.pubsub_v1.types.TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsPager:
```
Expand Down Expand Up @@ -161,3 +161,44 @@ The publisher and subscriber clients cannot be constructed with `client_config`
argument anymore. If you want to customize retry and timeout settings for a particular
method, you need to do it upon method invocation by passing the custom `timeout` and
`retry` arguments, respectively.


## Custom Retry and Timeout settings for Publisher Client

The ``publisher_options`` parameter to the Publisher Client, as well as all of the
client's methods, now accept custom retry and timeout settings:

```py
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)

custom_timeout=api_core.timeout.ExponentialTimeout(
initial=1.0,
maximum=10.0,
multiplier=1.0,
deadline=300.0,
)

publisher = pubsub_v1.PublisherClient(
publisher_options = pubsub_v1.types.PublisherOptions(
retry=custom_retry,
timeout=custom_timeout,
),
)
```

The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`,
or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example.
10 changes: 9 additions & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -73,6 +73,9 @@ class Batch(base.Batch):
commit_retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried when commiting the batch. If not
provided, a default retry is used.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when commiting the batch. If not provided, a
default timeout is used.
"""

def __init__(
Expand All @@ -83,6 +86,7 @@ def __init__(
batch_done_callback=None,
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand All @@ -106,6 +110,7 @@ def __init__(
self._size = self._base_request_size

self._commit_retry = commit_retry
self._commit_timeout = commit_timeout

@staticmethod
def make_lock():
Expand Down Expand Up @@ -261,7 +266,10 @@ def _commit(self):
try:
# Performs retries for errors defined by the retry configuration.
response = self._client.api.publish(
topic=self._topic, messages=self._messages, retry=self._commit_retry
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
timeout=self._commit_timeout,
)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
Expand Down
12 changes: 11 additions & 1 deletion google/cloud/pubsub_v1/publisher/_sequencer/base.py
Expand Up @@ -16,6 +16,9 @@

import abc

from google.api_core import gapic_v1
from google.pubsub_v1 import types as gapic_types


class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
Expand Down Expand Up @@ -45,14 +48,21 @@ def unpause(self, message): # pragma: NO COVER

@staticmethod
@abc.abstractmethod
def publish(self, message, retry=None): # pragma: NO COVER
def publish(
self,
message,
retry=None,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
): # pragma: NO COVER
""" Publish message for this ordering key.
Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.
Returns:
A class instance that conforms to Python Standard library's
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Expand Up @@ -21,6 +21,7 @@
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types


class _OrderedSequencerStatus(str, enum.Enum):
Expand Down Expand Up @@ -226,13 +227,19 @@ def unpause(self):
raise RuntimeError("Ordering key is not paused.")
self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.
Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -241,16 +248,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=self._batch_done_callback,
commit_when_full=False,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Publish message for this ordering key.
Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.
Returns:
A class instance that conforms to Python Standard library's
Expand Down Expand Up @@ -287,13 +302,15 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
), "Publish is only allowed in accepting-messages state."

if not self._ordered_batches:
new_batch = self._create_batch(commit_retry=retry)
new_batch = self._create_batch(
commit_retry=retry, commit_timeout=timeout
)
self._ordered_batches.append(new_batch)

batch = self._ordered_batches[-1]
future = batch.publish(message)
while future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(message)

Expand Down
23 changes: 19 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py
Expand Up @@ -15,6 +15,7 @@
from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types


class UnorderedSequencer(base.Sequencer):
Expand Down Expand Up @@ -77,13 +78,19 @@ def unpause(self):
""" Not relevant for this class. """
raise NotImplementedError

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.
Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -92,16 +99,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=None,
commit_when_full=True,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Batch message into existing or new batch.
Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.
Returns:
~google.api_core.future.Future: An object conforming to
Expand All @@ -119,7 +134,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
raise RuntimeError("Unordered sequencer already stopped.")

if not self._current_batch:
newbatch = self._create_batch(commit_retry=retry)
newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._current_batch = newbatch

batch = self._current_batch
Expand All @@ -129,7 +144,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
future = batch.publish(message)
# batch is full, triggering commit_when_full
if future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
# At this point, we lose track of the old batch, but we don't
# care since it's already committed (because it was full.)
self._current_batch = batch
Expand Down
22 changes: 20 additions & 2 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -230,7 +230,13 @@ def resume_publish(self, topic, ordering_key):
sequencer.unpause()

def publish(
self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs
self,
topic,
data,
ordering_key="",
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
**attrs
):
"""Publish a single message.
Expand Down Expand Up @@ -269,6 +275,12 @@ def publish(
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. If `ordering_key` is specified,
the total retry deadline will be changed to "infinity".
If given, it overides any retry passed into the client through
the ``publisher_options`` argument.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout for the RPC request. Can be used to override any timeout
passed in through ``publisher_options`` when instantiating the client.
attrs (Mapping[str, str]): A dictionary of attributes to be
sent as metadata. (These may be text strings or byte strings.)
Expand Down Expand Up @@ -331,6 +343,12 @@ def publish(
def on_publish_done(future):
self._flow_controller.release(message)

if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in
retry = self.publisher_options.retry

if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
timeout = self.publisher_options.timeout

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
Expand All @@ -347,7 +365,7 @@ def on_publish_done(future):

# Delegate the publishing to the sequencer.
sequencer = self._get_or_create_sequencer(topic, ordering_key)
future = sequencer.publish(message, retry=retry)
future = sequencer.publish(message, retry=retry, timeout=timeout)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -22,6 +22,7 @@
import proto

from google.api import http_pb2
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import policy_pb2
from google.iam.v1.logging import audit_data_pb2
Expand Down Expand Up @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum):
# This class is used when creating a publisher client to pass in options
# to enable/disable features.
PublisherOptions = collections.namedtuple(
"PublisherConfig", ["enable_message_ordering", "flow_control"]
"PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"]
)
PublisherOptions.__new__.__defaults__ = (
False, # enable_message_ordering: False
PublishFlowControl(), # default flow control settings
gapic_v1.method.DEFAULT, # use default api_core value for retry
gapic_v1.method.DEFAULT, # use default api_core value for timeout
)
PublisherOptions.__doc__ = "The options for the publisher client."
PublisherOptions.enable_message_ordering.__doc__ = (
Expand All @@ -112,6 +115,14 @@ class LimitExceededBehavior(str, enum.Enum):
"Flow control settings for message publishing by the client. By default "
"the publisher client does not do any throttling."
)
PublisherOptions.retry.__doc__ = (
"Retry settings for message publishing by the client. This should be "
"an instance of :class:`google.api_core.retry.Retry`."
)
PublisherOptions.timeout.__doc__ = (
"Timeout settings for message publishing by the client. It should be compatible "
"with :class:`~.pubsub_v1.types.TimeoutType`."
)

# Define the type class and default values for flow control settings.
#
Expand Down

0 comments on commit 7597604

Please sign in to comment.