Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support customizable retry and timeout settings on the publisher client #299

Merged
merged 19 commits into from Jun 15, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
118 changes: 61 additions & 57 deletions google/pubsub_v1/services/publisher/async_client.py
Expand Up @@ -33,7 +33,7 @@
from google.iam.v1 import policy_pb2 as policy # type: ignore
from google.pubsub_v1.services.publisher import pagers
from google.pubsub_v1.types import pubsub

from google.pubsub_v1.types import TimeoutType
from .transports.base import PublisherTransport, DEFAULT_CLIENT_INFO
from .transports.grpc_asyncio import PublisherGrpcAsyncIOTransport
from .client import PublisherClient
Expand Down Expand Up @@ -178,9 +178,7 @@ async def create_topic(
*,
name: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Creates the given topic with the given name. See the [resource
Expand All @@ -206,7 +204,7 @@ async def create_topic(

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -254,6 +252,9 @@ async def create_topic(
gapic_v1.routing_header.to_grpc_metadata((("name", request.name),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -265,9 +266,7 @@ async def update_topic(
request: pubsub.UpdateTopicRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Updates an existing topic. Note that certain
Expand All @@ -276,10 +275,9 @@ async def update_topic(
Args:
request (:class:`google.pubsub_v1.types.UpdateTopicRequest`):
The request object. Request for the UpdateTopic method.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -315,6 +313,9 @@ async def update_topic(
),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)
Copy link
Contributor

@jimfulton jimfulton May 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this except for the None case, which is new. I don't think we should accept None. Apologies if None came from me.

int and float are handled here:

https://github.com/googleapis/python-api-core/blob/7337c6b123735fb9ae5d0e54b4399275719c020c/google/api_core/gapic_v1/method.py#L66-L67

The same comment applies to other places where you added this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, of course, not DRY.


# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -328,9 +329,7 @@ async def publish(
topic: str = None,
messages: Sequence[pubsub.PubsubMessage] = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.PublishResponse:
r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
Expand All @@ -352,10 +351,9 @@ async def publish(
This corresponds to the ``messages`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -414,6 +412,9 @@ async def publish(
gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -426,9 +427,7 @@ async def get_topic(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.Topic:
r"""Gets the configuration of a topic.
Expand All @@ -443,10 +442,9 @@ async def get_topic(
This corresponds to the ``topic`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -498,6 +496,9 @@ async def get_topic(
gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -510,9 +511,7 @@ async def list_topics(
*,
project: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicsAsyncPager:
r"""Lists matching topics.
Expand All @@ -527,10 +526,9 @@ async def list_topics(
This corresponds to the ``project`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -586,6 +584,9 @@ async def list_topics(
gapic_v1.routing_header.to_grpc_metadata((("project", request.project),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -604,9 +605,7 @@ async def list_topic_subscriptions(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicSubscriptionsAsyncPager:
r"""Lists the names of the attached subscriptions on this
Expand All @@ -624,10 +623,9 @@ async def list_topic_subscriptions(
This corresponds to the ``topic`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -683,6 +681,9 @@ async def list_topic_subscriptions(
gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -701,9 +702,7 @@ async def list_topic_snapshots(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pagers.ListTopicSnapshotsAsyncPager:
r"""Lists the names of the snapshots on this topic. Snapshots are
Expand All @@ -725,10 +724,9 @@ async def list_topic_snapshots(
This corresponds to the ``topic`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -784,6 +782,9 @@ async def list_topic_snapshots(
gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -802,9 +803,7 @@ async def delete_topic(
*,
topic: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> None:
r"""Deletes the topic with the given name. Returns ``NOT_FOUND`` if
Expand All @@ -825,10 +824,9 @@ async def delete_topic(
This corresponds to the ``topic`` field
on the ``request`` instance; if ``request`` is provided, this
should not be set.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -872,6 +870,9 @@ async def delete_topic(
gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
await rpc(
request, retry=retry, timeout=timeout, metadata=metadata,
Expand All @@ -882,9 +883,7 @@ async def detach_subscription(
request: pubsub.DetachSubscriptionRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> pubsub.DetachSubscriptionResponse:
r"""Detaches a subscription from this topic. All messages retained
Expand All @@ -897,10 +896,9 @@ async def detach_subscription(
request (:class:`google.pubsub_v1.types.DetachSubscriptionRequest`):
The request object. Request for the DetachSubscription
method.

retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -938,6 +936,9 @@ async def detach_subscription(
),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -949,9 +950,7 @@ async def set_iam_policy(
request: iam_policy.SetIamPolicyRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> policy.Policy:
r"""Sets the IAM access control policy on the specified
Expand All @@ -962,7 +961,7 @@ async def set_iam_policy(
method.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -1047,6 +1046,9 @@ async def set_iam_policy(
gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -1058,9 +1060,7 @@ async def get_iam_policy(
request: iam_policy.GetIamPolicyRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> policy.Policy:
r"""Gets the IAM access control policy for a function.
Expand All @@ -1072,7 +1072,7 @@ async def get_iam_policy(
method.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -1157,6 +1157,9 @@ async def get_iam_policy(
gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand All @@ -1168,9 +1171,7 @@ async def test_iam_permissions(
request: iam_policy.TestIamPermissionsRequest = None,
*,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: Union[
timeouts.ConstantTimeout, timeouts.ExponentialTimeout
] = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
) -> iam_policy.TestIamPermissionsResponse:
r"""Tests the specified permissions against the IAM access control
Expand All @@ -1182,7 +1183,7 @@ async def test_iam_permissions(
`TestIamPermissions` method.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (Union[timeouts.ConstantTimeout, timeouts.ExponentialTimeout]):
timeout (TimeoutType):
The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
Expand Down Expand Up @@ -1211,6 +1212,9 @@ async def test_iam_permissions(
gapic_v1.routing_header.to_grpc_metadata((("resource", request.resource),)),
)

if timeout is None or isinstance(timeout, (int, float)):
timeout = timeouts.ConstantTimeout(timeout)

# Send the request.
response = await rpc(request, retry=retry, timeout=timeout, metadata=metadata,)

Expand Down