Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support customizable retry and timeout settings on the publisher client #299

Merged
merged 19 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -73,10 +73,7 @@ class Batch(base.Batch):
commit_retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried when commiting the batch. If not
provided, a default retry is used.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when commiting the batch. If not provided, a
default timeout is used.
"""
Expand All @@ -89,7 +86,7 @@ def __init__(
batch_done_callback=None,
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand Down
11 changes: 6 additions & 5 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
Expand Up @@ -16,6 +16,8 @@

import abc

from google.pubsub_v1 import types as gapic_types


class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
Expand Down Expand Up @@ -45,18 +47,17 @@ def unpause(self, message): # pragma: NO COVER

@staticmethod
@abc.abstractmethod
def publish(self, message, retry=None, timeout=None): # pragma: NO COVER
def publish(
self, message, retry=None, timeout: gapic_types.TimeoutType = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use DEFAULT rather than None? The implementation defaults to DEFAULT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, the ABC should be consistent.

): # pragma: NO COVER
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage):
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
Expand Down
18 changes: 8 additions & 10 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Expand Up @@ -21,6 +21,7 @@
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types


class _OrderedSequencerStatus(str, enum.Enum):
Expand Down Expand Up @@ -229,18 +230,15 @@ def unpause(self):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
Expand All @@ -254,7 +252,10 @@ def _create_batch(
)

def publish(
self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Publish message for this ordering key.

Expand All @@ -263,10 +264,7 @@ def publish(
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
Expand Down
18 changes: 8 additions & 10 deletions google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py
Expand Up @@ -15,6 +15,7 @@
from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types


class UnorderedSequencer(base.Sequencer):
Expand Down Expand Up @@ -80,18 +81,15 @@ def unpause(self):
def _create_batch(
self,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Create a new batch using the client's batch class and other stored
settings.

Args:
commit_retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the batch.
commit_timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the batch.
"""
return self._client._batch_class(
Expand All @@ -105,7 +103,10 @@ def _create_batch(
)

def publish(
self, message, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT
self,
message,
retry=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
):
""" Batch message into existing or new batch.

Expand All @@ -114,10 +115,7 @@ def publish(
The Pub/Sub message.
retry (Optional[google.api_core.retry.Retry]):
The retry settings to apply when publishing the message.
timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply when publishing the message.

Returns:
Expand Down
10 changes: 3 additions & 7 deletions google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -235,7 +235,7 @@ def publish(
data,
ordering_key="",
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
**attrs
):
"""Publish a single message.
Expand Down Expand Up @@ -273,15 +273,11 @@ def publish(
publish order should be respected. Message ordering must be
enabled for this client to use this feature.
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. If `ordering_key` is specified,
errors, if any, should be retried. If `ordering_key` is specified,\
plamut marked this conversation as resolved.
Show resolved Hide resolved
the total retry deadline will be changed to "infinity".
If given, it overides any retry passed into the client through
the ``publisher_options`` argument.

timeout (Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]):
timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout for the RPC request. Can be used to override any timeout
passed in through ``publisher_options`` when instantiating the client.

Expand Down
13 changes: 0 additions & 13 deletions google/cloud/pubsub_v1/types.py
Expand Up @@ -18,12 +18,10 @@
import enum
import inspect
import sys
from typing import Union

import proto

from google.api import http_pb2
import google.api_core
from google.api_core import gapic_v1
from google.iam.v1 import iam_policy_pb2
from google.iam.v1 import policy_pb2
Expand All @@ -39,16 +37,6 @@
from google.pubsub_v1.types import pubsub as pubsub_gapic_types


TimeoutType = Union[
None,
int,
float,
google.api_core.timeout.ConstantTimeout,
google.api_core.timeout.ExponentialTimeout,
]
"""The type of the timeout parameter of publisher client methods."""


# Define the default values for batching.
#
# This class is used when creating a publisher or subscriber client, and
Expand Down Expand Up @@ -217,7 +205,6 @@ def _get_protobuf_messages(module):
_local_modules = [pubsub_gapic_types]

names = [
"TimeoutType",
"BatchSettings",
"LimitExceededBehavior",
"PublishFlowControl",
Expand Down
7 changes: 2 additions & 5 deletions tests/unit/pubsub_v1/publisher/batch/test_thread.py
Expand Up @@ -42,7 +42,7 @@ def create_batch(
batch_done_callback=None,
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
**batch_settings
):
"""Return a batch object suitable for testing.
Expand All @@ -55,10 +55,7 @@ def create_batch(
has reached byte-size or number-of-messages limits.
commit_retry (Optional[google.api_core.retry.Retry]): The retry settings
for the batch commit call.
commit_timeout (Optional[Union[ \
googole.api_core.timeout.ConstantTimeout, \
googole.api_core.timeout.ExponentialTimeout \
]]):
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply to the batch commit call.
batch_settings (Mapping[str, str]): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.
Expand Down