Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support customizable retry and timeout settings on the publisher client #299

Merged
merged 19 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 42 additions & 1 deletion UPGRADING.md
Expand Up @@ -100,7 +100,7 @@ specified by the API producer.
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
timeout: google.pubsub_v1.types.TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsPager:
```
Expand Down Expand Up @@ -161,3 +161,44 @@ The publisher and subscriber clients cannot be constructed with `client_config`
argument anymore. If you want to customize retry and timeout settings for a particular
method, you need to do it upon method invocation by passing the custom `timeout` and
`retry` arguments, respectively.


## Custom Retry and Timeout settings for Publisher Client

The ``publisher_options`` parameter to the Publisher Client, as well as all of the
client's methods, now accept custom retry and timeout settings:

```py
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)

custom_timeout=api_core.timeout.ExponentialTimeout(
initial=1.0,
maximum=10.0,
multiplier=1.0,
deadline=300.0,
)

publisher = pubsub_v1.PublisherClient(
publisher_options = pubsub_v1.types.PublisherOptions(
retry=custom_retry,
timeout=custom_timeout,
),
)
```

The timeout can be either an instance of `google.api_core.timeout.ConstantTimeout`,
or an instance of `google.api_core.timeout.ExponentialTimeout`, as in the example.
10 changes: 9 additions & 1 deletion google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -73,6 +73,9 @@ class Batch(base.Batch):
commit_retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried when commiting the batch. If not
provided, a default retry is used.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when commiting the batch. If not provided, a
default timeout is used.
"""

def __init__(
Expand All @@ -83,6 +86,7 @@ def __init__(
batch_done_callback=None,
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand All @@ -106,6 +110,7 @@ def __init__(
self._size = self._base_request_size

self._commit_retry = commit_retry
self._commit_timeout = commit_timeout

@staticmethod
def make_lock():
Expand Down Expand Up @@ -259,7 +264,10 @@ def _commit(self):
try:
# Performs retries for errors defined by the retry configuration.
response = self._client.api.publish(
topic=self._topic, messages=self._messages, retry=self._commit_retry
topic=self._topic,
messages=self._messages,
retry=self._commit_retry,
timeout=self._commit_timeout,
)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
Expand Down
8 changes: 7 additions & 1 deletion google/cloud/pubsub_v1/publisher/_sequencer/base.py
Expand Up @@ -16,6 +16,8 @@

import abc

from google.pubsub_v1 import types as gapic_types


class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
Expand Down Expand Up @@ -45,14 +47,18 @@ def unpause(self, message): # pragma: NO COVER

@staticmethod
@abc.abstractmethod
def publish(self, message, retry=None): # pragma: NO COVER
def publish(
self, message, retry=None, timeout: gapic_types.TimeoutType = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use DEFAULT rather than None? The implementation defaults to DEFAULT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, the ABC should be consistent.

): # pragma: NO COVER
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
A class instance that conforms to Python Standard library's
Expand Down
25 changes: 21 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Expand Up @@ -21,6 +21,7 @@
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types


class _OrderedSequencerStatus(str, enum.Enum):
Expand Down Expand Up @@ -226,13 +227,19 @@ def unpause(self):
raise RuntimeError("Ordering key is not paused.")
self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -241,16 +248,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=self._batch_done_callback,
commit_when_full=False,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
A class instance that conforms to Python Standard library's
Expand Down Expand Up @@ -287,13 +302,15 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
), "Publish is only allowed in accepting-messages state."

if not self._ordered_batches:
new_batch = self._create_batch(commit_retry=retry)
new_batch = self._create_batch(
commit_retry=retry, commit_timeout=timeout
)
self._ordered_batches.append(new_batch)

batch = self._ordered_batches[-1]
future = batch.publish(message)
while future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(message)

Expand Down
23 changes: 19 additions & 4 deletions google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py
Expand Up @@ -15,6 +15,7 @@
from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types


class UnorderedSequencer(base.Sequencer):
Expand Down Expand Up @@ -77,13 +78,19 @@ def unpause(self):
""" Not relevant for this class. """
raise NotImplementedError

def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
client=self._client,
Expand All @@ -92,16 +99,24 @@ def _create_batch(self, commit_retry=gapic_v1.method.DEFAULT):
batch_done_callback=None,
commit_when_full=True,
commit_retry=commit_retry,
commit_timeout=commit_timeout,
)

def publish(self, message, retry=gapic_v1.method.DEFAULT):
def publish(
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Batch message into existing or new batch.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
~google.api_core.future.Future: An object conforming to
Expand All @@ -119,7 +134,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
raise RuntimeError("Unordered sequencer already stopped.")

if not self._current_batch:
newbatch = self._create_batch(commit_retry=retry)
newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._current_batch = newbatch

batch = self._current_batch
Expand All @@ -129,7 +144,7 @@ def publish(self, message, retry=gapic_v1.method.DEFAULT):
future = batch.publish(message)
# batch is full, triggering commit_when_full
if future is None:
batch = self._create_batch(commit_retry=retry)
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
# At this point, we lose track of the old batch, but we don't
# care since it's already committed (because it was full.)
self._current_batch = batch
Expand Down
24 changes: 21 additions & 3 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -230,7 +230,13 @@ def resume_publish(self, topic, ordering_key):
sequencer.unpause()

def publish(
self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, **attrs
self,
topic,
data,
ordering_key="",
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
**attrs
):
"""Publish a single message.

Expand Down Expand Up @@ -267,8 +273,14 @@ def publish(
publish order should be respected. Message ordering must be
enabled for this client to use this feature.
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. If `ordering_key` is specified,
errors, if any, should be retried. If `ordering_key` is specified,\
plamut marked this conversation as resolved.
Show resolved Hide resolved
the total retry deadline will be changed to "infinity".
If given, it overides any retry passed into the client through
the ``publisher_options`` argument.
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout for the RPC request. Can be used to override any timeout
passed in through ``publisher_options`` when instantiating the client.

attrs (Mapping[str, str]): A dictionary of attributes to be
sent as metadata. (These may be text strings or byte strings.)

Expand Down Expand Up @@ -331,6 +343,12 @@ def publish(
def on_publish_done(future):
self._flow_controller.release(message)

if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in
retry = self.publisher_options.retry

if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in
timeout = self.publisher_options.timeout

with self._batch_lock:
if self._is_stopped:
raise RuntimeError("Cannot publish on a stopped publisher.")
Expand All @@ -347,7 +365,7 @@ def on_publish_done(future):

# Delegate the publishing to the sequencer.
sequencer = self._get_or_create_sequencer(topic, ordering_key)
future = sequencer.publish(message, retry=retry)
future = sequencer.publish(message, retry=retry, timeout=timeout)
future.add_done_callback(on_publish_done)

# Create a timer thread if necessary to enforce the batching
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/pubsub_v1/types.py
Expand Up @@ -22,6 +22,7 @@
import proto

from google.api import http_pb2
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import policy_pb2
from google.iam.v1.logging import audit_data_pb2
Expand Down Expand Up @@ -98,11 +99,13 @@ class LimitExceededBehavior(str, enum.Enum):
# This class is used when creating a publisher client to pass in options
# to enable/disable features.
PublisherOptions = collections.namedtuple(
"PublisherConfig", ["enable_message_ordering", "flow_control"]
"PublisherOptions", ["enable_message_ordering", "flow_control", "retry", "timeout"]
)
PublisherOptions.__new__.__defaults__ = (
False, # enable_message_ordering: False
PublishFlowControl(), # default flow control settings
gapic_v1.method.DEFAULT, # use default api_core value for retry
gapic_v1.method.DEFAULT, # use default api_core value for timeout
)
PublisherOptions.__doc__ = "The options for the publisher client."
PublisherOptions.enable_message_ordering.__doc__ = (
Expand All @@ -112,6 +115,14 @@ class LimitExceededBehavior(str, enum.Enum):
"Flow control settings for message publishing by the client. By default "
"the publisher client does not do any throttling."
)
PublisherOptions.retry.__doc__ = (
"Retry settings for message publishing by the client. This should be "
"an instance of :class:`google.api_core.retry.Retry`."
)
PublisherOptions.timeout.__doc__ = (
"Timeout settings for message publishing by the client. It should be compatible "
"with :class:`~.pubsub_v1.types.TimeoutType`."
)

# Define the type class and default values for flow control settings.
#
Expand Down