diff --git a/docs/retry.rst b/docs/retry.rst index 23a7d70f..97a7f2ca 100644 --- a/docs/retry.rst +++ b/docs/retry.rst @@ -4,3 +4,10 @@ Retry .. automodule:: google.api_core.retry :members: :show-inheritance: + +Retry in AsyncIO +---------------- + +.. automodule:: google.api_core.retry_async + :members: + :show-inheritance: diff --git a/google/api_core/exceptions.py b/google/api_core/exceptions.py index eed4ee40..d1459ab2 100644 --- a/google/api_core/exceptions.py +++ b/google/api_core/exceptions.py @@ -444,6 +444,10 @@ def from_grpc_status(status_code, message, **kwargs): return error +def _is_informative_grpc_error(rpc_exc): + return hasattr(rpc_exc, "code") and hasattr(rpc_exc, "details") + + def from_grpc_error(rpc_exc): """Create a :class:`GoogleAPICallError` from a :class:`grpc.RpcError`. @@ -454,7 +458,9 @@ def from_grpc_error(rpc_exc): GoogleAPICallError: An instance of the appropriate subclass of :class:`GoogleAPICallError`. """ - if isinstance(rpc_exc, grpc.Call): + # NOTE(lidiz) All gRPC error shares the parent class grpc.RpcError. + # However, check for grpc.RpcError breaks backward compatibility. + if isinstance(rpc_exc, grpc.Call) or _is_informative_grpc_error(rpc_exc): return from_grpc_status( rpc_exc.code(), rpc_exc.details(), errors=(rpc_exc,), response=rpc_exc ) diff --git a/google/api_core/gapic_v1/__init__.py b/google/api_core/gapic_v1/__init__.py index e7a7a686..e47d2cb5 100644 --- a/google/api_core/gapic_v1/__init__.py +++ b/google/api_core/gapic_v1/__init__.py @@ -12,9 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys + from google.api_core.gapic_v1 import client_info from google.api_core.gapic_v1 import config from google.api_core.gapic_v1 import method from google.api_core.gapic_v1 import routing_header __all__ = ["client_info", "config", "method", "routing_header"] + +if sys.version_info >= (3, 6): + from google.api_core.gapic_v1 import config_async # noqa: F401 + __all__.append("config_async") diff --git a/google/api_core/gapic_v1/config.py b/google/api_core/gapic_v1/config.py index 3a3eb15f..2a56cf1b 100644 --- a/google/api_core/gapic_v1/config.py +++ b/google/api_core/gapic_v1/config.py @@ -45,7 +45,7 @@ def _exception_class_for_grpc_status_name(name): return exceptions.exception_class_for_grpc_status(getattr(grpc.StatusCode, name)) -def _retry_from_retry_config(retry_params, retry_codes): +def _retry_from_retry_config(retry_params, retry_codes, retry_impl=retry.Retry): """Creates a Retry object given a gapic retry configuration. Args: @@ -70,7 +70,7 @@ def _retry_from_retry_config(retry_params, retry_codes): exception_classes = [ _exception_class_for_grpc_status_name(code) for code in retry_codes ] - return retry.Retry( + return retry_impl( retry.if_exception_type(*exception_classes), initial=(retry_params["initial_retry_delay_millis"] / _MILLIS_PER_SECOND), maximum=(retry_params["max_retry_delay_millis"] / _MILLIS_PER_SECOND), @@ -110,7 +110,7 @@ def _timeout_from_retry_config(retry_params): MethodConfig = collections.namedtuple("MethodConfig", ["retry", "timeout"]) -def parse_method_configs(interface_config): +def parse_method_configs(interface_config, retry_impl=retry.Retry): """Creates default retry and timeout objects for each method in a gapic interface config. @@ -120,6 +120,8 @@ def parse_method_configs(interface_config): an interface named ``google.example.v1.ExampleService`` you would pass in just that interface's configuration, for example ``gapic_config['interfaces']['google.example.v1.ExampleService']``. + retry_impl (Callable): The constructor that creates a retry decorator + that will be applied to the method based on method configs. Returns: Mapping[str, MethodConfig]: A mapping of RPC method names to their @@ -151,7 +153,7 @@ def parse_method_configs(interface_config): if retry_params_name is not None: retry_params = retry_params_map[retry_params_name] retry_ = _retry_from_retry_config( - retry_params, retry_codes_map[method_params["retry_codes_name"]] + retry_params, retry_codes_map[method_params["retry_codes_name"]], retry_impl ) timeout_ = _timeout_from_retry_config(retry_params) diff --git a/google/api_core/gapic_v1/config_async.py b/google/api_core/gapic_v1/config_async.py new file mode 100644 index 00000000..00e5e240 --- /dev/null +++ b/google/api_core/gapic_v1/config_async.py @@ -0,0 +1,42 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""AsyncIO helpers for loading gapic configuration data. + +The Google API generator creates supplementary configuration for each RPC +method to tell the client library how to deal with retries and timeouts. +""" + +from google.api_core import retry_async +from google.api_core.gapic_v1 import config +from google.api_core.gapic_v1.config import MethodConfig # noqa: F401 + + +def parse_method_configs(interface_config): + """Creates default retry and timeout objects for each method in a gapic + interface config with AsyncIO semantics. + + Args: + interface_config (Mapping): The interface config section of the full + gapic library config. For example, If the full configuration has + an interface named ``google.example.v1.ExampleService`` you would + pass in just that interface's configuration, for example + ``gapic_config['interfaces']['google.example.v1.ExampleService']``. + + Returns: + Mapping[str, MethodConfig]: A mapping of RPC method names to their + configuration. + """ + return config.parse_method_configs( + interface_config, + retry_impl=retry_async.AsyncRetry) diff --git a/google/api_core/retry_async.py b/google/api_core/retry_async.py new file mode 100644 index 00000000..f925c3d3 --- /dev/null +++ b/google/api_core/retry_async.py @@ -0,0 +1,282 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helpers for retrying coroutine functions with exponential back-off. + +The :class:`AsyncRetry` decorator shares most functionality and behavior with +:class:`Retry`, but supports coroutine functions. Please refer to description +of :class:`Retry` for more details. + +By default, this decorator will retry transient +API errors (see :func:`if_transient_error`). For example: + +.. code-block:: python + + @retry_async.AsyncRetry() + async def call_flaky_rpc(): + return await client.flaky_rpc() + + # Will retry flaky_rpc() if it raises transient API errors. + result = await call_flaky_rpc() + +You can pass a custom predicate to retry on different exceptions, such as +waiting for an eventually consistent item to be available: + +.. code-block:: python + + @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound)) + async def check_if_exists(): + return await client.does_thing_exist() + + is_available = await check_if_exists() + +Some client library methods apply retry automatically. These methods can accept +a ``retry`` parameter that allows you to configure the behavior: + +.. code-block:: python + + my_retry = retry_async.AsyncRetry(deadline=60) + result = await client.some_method(retry=my_retry) + +""" + +import asyncio +import datetime +import functools +import logging + +from google.api_core import datetime_helpers, exceptions +from google.api_core.retry import (exponential_sleep_generator, # noqa: F401 + if_exception_type, if_transient_error) + +_LOGGER = logging.getLogger(__name__) +_DEFAULT_INITIAL_DELAY = 1.0 # seconds +_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds +_DEFAULT_DELAY_MULTIPLIER = 2.0 +_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds + + +async def retry_target(target, predicate, sleep_generator, deadline, on_error=None): + """Call a function and retry if it fails. + + This is the lowest-level retry helper. Generally, you'll use the + higher-level retry helper :class:`Retry`. + + Args: + target(Callable): The function to call and retry. This must be a + nullary function - apply arguments with `functools.partial`. + predicate (Callable[Exception]): A callable used to determine if an + exception raised by the target should be considered retryable. + It should return True to retry or False otherwise. + sleep_generator (Iterable[float]): An infinite iterator that determines + how long to sleep between retries. + deadline (float): How long to keep retrying the target. The last sleep + period is shortened as necessary, so that the last retry runs at + ``deadline`` (and not considerably beyond it). + on_error (Callable[Exception]): A function to call while processing a + retryable exception. Any error raised by this function will *not* + be caught. + + Returns: + Any: the return value of the target function. + + Raises: + google.api_core.RetryError: If the deadline is exceeded while retrying. + ValueError: If the sleep generator stops yielding values. + Exception: If the target raises a method that isn't retryable. + """ + deadline_dt = (datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline)) if deadline else None + + last_exc = None + + for sleep in sleep_generator: + try: + if not deadline_dt: + return await target() + else: + return await asyncio.wait_for( + target(), + timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds() + ) + # pylint: disable=broad-except + # This function explicitly must deal with broad exceptions. + except Exception as exc: + if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError): + raise + last_exc = exc + if on_error is not None: + on_error(exc) + + now = datetime_helpers.utcnow() + + if deadline_dt: + if deadline_dt <= now: + # Chains the raising RetryError with the root cause error, + # which helps observability and debugability. + raise exceptions.RetryError( + "Deadline of {:.1f}s exceeded while calling {}".format( + deadline, target + ), + last_exc, + ) from last_exc + else: + time_to_deadline = (deadline_dt - now).total_seconds() + sleep = min(time_to_deadline, sleep) + + _LOGGER.debug( + "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep) + ) + await asyncio.sleep(sleep) + + raise ValueError("Sleep generator stopped yielding sleep values.") + + +class AsyncRetry: + """Exponential retry decorator for async functions. + + This class is a decorator used to add exponential back-off retry behavior + to an RPC call. + + Although the default behavior is to retry transient API errors, a + different predicate can be provided to retry other exceptions. + + Args: + predicate (Callable[Exception]): A callable that should return ``True`` + if the given exception is retryable. + initial (float): The minimum a,out of time to delay in seconds. This + must be greater than 0. + maximum (float): The maximum amout of time to delay in seconds. + multiplier (float): The multiplier applied to the delay. + deadline (float): How long to keep retrying in seconds. The last sleep + period is shortened as necessary, so that the last retry runs at + ``deadline`` (and not considerably beyond it). + on_error (Callable[Exception]): A function to call while processing + a retryable exception. Any error raised by this function will + *not* be caught. + """ + + def __init__( + self, + predicate=if_transient_error, + initial=_DEFAULT_INITIAL_DELAY, + maximum=_DEFAULT_MAXIMUM_DELAY, + multiplier=_DEFAULT_DELAY_MULTIPLIER, + deadline=_DEFAULT_DEADLINE, + on_error=None, + ): + self._predicate = predicate + self._initial = initial + self._multiplier = multiplier + self._maximum = maximum + self._deadline = deadline + self._on_error = on_error + + def __call__(self, func, on_error=None): + """Wrap a callable with retry behavior. + + Args: + func (Callable): The callable to add retry behavior to. + on_error (Callable[Exception]): A function to call while processing + a retryable exception. Any error raised by this function will + *not* be caught. + + Returns: + Callable: A callable that will invoke ``func`` with retry + behavior. + """ + if self._on_error is not None: + on_error = self._on_error + + @functools.wraps(func) + async def retry_wrapped_func(*args, **kwargs): + """A wrapper that calls target function with retry.""" + target = functools.partial(func, *args, **kwargs) + sleep_generator = exponential_sleep_generator( + self._initial, self._maximum, multiplier=self._multiplier + ) + return await retry_target( + target, + self._predicate, + sleep_generator, + self._deadline, + on_error=on_error, + ) + + return retry_wrapped_func + + def _replace(self, + predicate=None, + initial=None, + maximum=None, + multiplier=None, + deadline=None, + on_error=None): + return AsyncRetry( + predicate=predicate or self._predicate, + initial=initial or self._initial, + maximum=maximum or self._maximum, + multiplier=multiplier or self._multiplier, + deadline=deadline or self._deadline, + on_error=on_error or self._on_error, + ) + + def with_deadline(self, deadline): + """Return a copy of this retry with the given deadline. + + Args: + deadline (float): How long to keep retrying. + + Returns: + AsyncRetry: A new retry instance with the given deadline. + """ + return self._replace(deadline=deadline) + + def with_predicate(self, predicate): + """Return a copy of this retry with the given predicate. + + Args: + predicate (Callable[Exception]): A callable that should return + ``True`` if the given exception is retryable. + + Returns: + AsyncRetry: A new retry instance with the given predicate. + """ + return self._replace(predicate=predicate) + + def with_delay(self, initial=None, maximum=None, multiplier=None): + """Return a copy of this retry with the given delay options. + + Args: + initial (float): The minimum amout of time to delay. This must + be greater than 0. + maximum (float): The maximum amout of time to delay. + multiplier (float): The multiplier applied to the delay. + + Returns: + AsyncRetry: A new retry instance with the given predicate. + """ + return self._replace(initial=initial, maximum=maximum, multiplier=multiplier) + + def __str__(self): + return ( + "".format( + self._predicate, + self._initial, + self._maximum, + self._multiplier, + self._deadline, + self._on_error, + ) + ) diff --git a/noxfile.py b/noxfile.py index dfb12575..989bb9be 100644 --- a/noxfile.py +++ b/noxfile.py @@ -15,10 +15,23 @@ from __future__ import absolute_import import os import shutil +import sys # https://github.com/google/importlab/issues/25 import nox # pytype: disable=import-error +_MINIMAL_ASYNCIO_SUPPORT_PYTHON_VERSION = [3, 6] + + +def _greater_or_equal_than_36(version_string): + tokens = version_string.split('.') + for i, token in enumerate(tokens): + try: + tokens[i] = int(token) + except ValueError: + pass + return tokens >= [3, 6] + def default(session): """Default unit test session. @@ -32,8 +45,9 @@ def default(session): session.install("mock", "pytest", "pytest-cov", "grpcio >= 1.0.2") session.install("-e", ".") - # Run py.test against the unit tests. - session.run( + pytest_args = [ + "python", + "-m", "py.test", "--quiet", "--cov=google.api_core", @@ -43,8 +57,19 @@ def default(session): "--cov-report=", "--cov-fail-under=0", os.path.join("tests", "unit"), - *session.posargs - ) + ] + pytest_args.extend(session.posargs) + + # Inject AsyncIO content, if version >= 3.6. + if _greater_or_equal_than_36(session.python): + session.install("asyncmock", "pytest-asyncio") + + pytest_args.append("--cov=tests.asyncio") + pytest_args.append(os.path.join("tests", "asyncio")) + session.run(*pytest_args) + else: + # Run py.test against the unit tests. + session.run(*pytest_args) @nox.session(python=["2.7", "3.5", "3.6", "3.7", "3.8"]) diff --git a/tests/asyncio/__init__.py b/tests/asyncio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/asyncio/gapic/test_config_async.py b/tests/asyncio/gapic/test_config_async.py new file mode 100644 index 00000000..1f6ea9e2 --- /dev/null +++ b/tests/asyncio/gapic/test_config_async.py @@ -0,0 +1,87 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core import exceptions +from google.api_core.gapic_v1 import config_async + + +INTERFACE_CONFIG = { + "retry_codes": { + "idempotent": ["DEADLINE_EXCEEDED", "UNAVAILABLE"], + "other": ["FAILED_PRECONDITION"], + "non_idempotent": [], + }, + "retry_params": { + "default": { + "initial_retry_delay_millis": 1000, + "retry_delay_multiplier": 2.5, + "max_retry_delay_millis": 120000, + "initial_rpc_timeout_millis": 120000, + "rpc_timeout_multiplier": 1.0, + "max_rpc_timeout_millis": 120000, + "total_timeout_millis": 600000, + }, + "other": { + "initial_retry_delay_millis": 1000, + "retry_delay_multiplier": 1, + "max_retry_delay_millis": 1000, + "initial_rpc_timeout_millis": 1000, + "rpc_timeout_multiplier": 1, + "max_rpc_timeout_millis": 1000, + "total_timeout_millis": 1000, + }, + }, + "methods": { + "AnnotateVideo": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default", + }, + "Other": { + "timeout_millis": 60000, + "retry_codes_name": "other", + "retry_params_name": "other", + }, + "Plain": {"timeout_millis": 30000}, + }, +} + + +def test_create_method_configs(): + method_configs = config_async.parse_method_configs(INTERFACE_CONFIG) + + retry, timeout = method_configs["AnnotateVideo"] + assert retry._predicate(exceptions.DeadlineExceeded(None)) + assert retry._predicate(exceptions.ServiceUnavailable(None)) + assert retry._initial == 1.0 + assert retry._multiplier == 2.5 + assert retry._maximum == 120.0 + assert retry._deadline == 600.0 + assert timeout._initial == 120.0 + assert timeout._multiplier == 1.0 + assert timeout._maximum == 120.0 + + retry, timeout = method_configs["Other"] + assert retry._predicate(exceptions.FailedPrecondition(None)) + assert retry._initial == 1.0 + assert retry._multiplier == 1.0 + assert retry._maximum == 1.0 + assert retry._deadline == 1.0 + assert timeout._initial == 1.0 + assert timeout._multiplier == 1.0 + assert timeout._maximum == 1.0 + + retry, timeout = method_configs["Plain"] + assert retry is None + assert timeout._timeout == 30.0 diff --git a/tests/asyncio/test_retry_async.py b/tests/asyncio/test_retry_async.py new file mode 100644 index 00000000..8f863668 --- /dev/null +++ b/tests/asyncio/test_retry_async.py @@ -0,0 +1,397 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import re + +import mock +import pytest + +from google.api_core import exceptions +from google.api_core import retry_async + + +@mock.patch("asyncio.sleep", autospec=True) +@mock.patch( + "google.api_core.datetime_helpers.utcnow", + return_value=datetime.datetime.min, + autospec=True, +) +@pytest.mark.asyncio +async def test_retry_target_success(utcnow, sleep): + predicate = retry_async.if_exception_type(ValueError) + call_count = [0] + + async def target(): + call_count[0] += 1 + if call_count[0] < 3: + raise ValueError() + return 42 + + result = await retry_async.retry_target(target, predicate, range(10), None) + + assert result == 42 + assert call_count[0] == 3 + sleep.assert_has_calls([mock.call(0), mock.call(1)]) + + +@mock.patch("asyncio.sleep", autospec=True) +@mock.patch( + "google.api_core.datetime_helpers.utcnow", + return_value=datetime.datetime.min, + autospec=True, +) +@pytest.mark.asyncio +async def test_retry_target_w_on_error(utcnow, sleep): + predicate = retry_async.if_exception_type(ValueError) + call_count = {"target": 0} + to_raise = ValueError() + + async def target(): + call_count["target"] += 1 + if call_count["target"] < 3: + raise to_raise + return 42 + + on_error = mock.Mock() + + result = await retry_async.retry_target(target, predicate, range(10), None, on_error=on_error) + + assert result == 42 + assert call_count["target"] == 3 + + on_error.assert_has_calls([mock.call(to_raise), mock.call(to_raise)]) + sleep.assert_has_calls([mock.call(0), mock.call(1)]) + + +@mock.patch("asyncio.sleep", autospec=True) +@mock.patch( + "google.api_core.datetime_helpers.utcnow", + return_value=datetime.datetime.min, + autospec=True, +) +@pytest.mark.asyncio +async def test_retry_target_non_retryable_error(utcnow, sleep): + predicate = retry_async.if_exception_type(ValueError) + exception = TypeError() + target = mock.Mock(side_effect=exception) + + with pytest.raises(TypeError) as exc_info: + await retry_async.retry_target(target, predicate, range(10), None) + + assert exc_info.value == exception + sleep.assert_not_called() + + +@mock.patch("asyncio.sleep", autospec=True) +@mock.patch("google.api_core.datetime_helpers.utcnow", autospec=True) +@pytest.mark.asyncio +async def test_retry_target_deadline_exceeded(utcnow, sleep): + predicate = retry_async.if_exception_type(ValueError) + exception = ValueError("meep") + target = mock.Mock(side_effect=exception) + # Setup the timeline so that the first call takes 5 seconds but the second + # call takes 6, which puts the retry over the deadline. + utcnow.side_effect = [ + # The first call to utcnow establishes the start of the timeline. + datetime.datetime.min, + datetime.datetime.min + datetime.timedelta(seconds=5), + datetime.datetime.min + datetime.timedelta(seconds=11), + ] + + with pytest.raises(exceptions.RetryError) as exc_info: + await retry_async.retry_target(target, predicate, range(10), deadline=10) + + assert exc_info.value.cause == exception + assert exc_info.match("Deadline of 10.0s exceeded") + assert exc_info.match("last exception: meep") + assert target.call_count == 2 + + +@pytest.mark.asyncio +async def test_retry_target_bad_sleep_generator(): + with pytest.raises(ValueError, match="Sleep generator"): + await retry_async.retry_target(mock.sentinel.target, mock.sentinel.predicate, [], None) + + +class TestAsyncRetry: + + def test_constructor_defaults(self): + retry_ = retry_async.AsyncRetry() + assert retry_._predicate == retry_async.if_transient_error + assert retry_._initial == 1 + assert retry_._maximum == 60 + assert retry_._multiplier == 2 + assert retry_._deadline == 120 + assert retry_._on_error is None + + def test_constructor_options(self): + _some_function = mock.Mock() + + retry_ = retry_async.AsyncRetry( + predicate=mock.sentinel.predicate, + initial=1, + maximum=2, + multiplier=3, + deadline=4, + on_error=_some_function, + ) + assert retry_._predicate == mock.sentinel.predicate + assert retry_._initial == 1 + assert retry_._maximum == 2 + assert retry_._multiplier == 3 + assert retry_._deadline == 4 + assert retry_._on_error is _some_function + + def test_with_deadline(self): + retry_ = retry_async.AsyncRetry( + predicate=mock.sentinel.predicate, + initial=1, + maximum=2, + multiplier=3, + deadline=4, + on_error=mock.sentinel.on_error, + ) + new_retry = retry_.with_deadline(42) + assert retry_ is not new_retry + assert new_retry._deadline == 42 + + # the rest of the attributes should remain the same + assert new_retry._predicate is retry_._predicate + assert new_retry._initial == retry_._initial + assert new_retry._maximum == retry_._maximum + assert new_retry._multiplier == retry_._multiplier + assert new_retry._on_error is retry_._on_error + + def test_with_predicate(self): + retry_ = retry_async.AsyncRetry( + predicate=mock.sentinel.predicate, + initial=1, + maximum=2, + multiplier=3, + deadline=4, + on_error=mock.sentinel.on_error, + ) + new_retry = retry_.with_predicate(mock.sentinel.predicate) + assert retry_ is not new_retry + assert new_retry._predicate == mock.sentinel.predicate + + # the rest of the attributes should remain the same + assert new_retry._deadline == retry_._deadline + assert new_retry._initial == retry_._initial + assert new_retry._maximum == retry_._maximum + assert new_retry._multiplier == retry_._multiplier + assert new_retry._on_error is retry_._on_error + + def test_with_delay_noop(self): + retry_ = retry_async.AsyncRetry( + predicate=mock.sentinel.predicate, + initial=1, + maximum=2, + multiplier=3, + deadline=4, + on_error=mock.sentinel.on_error, + ) + new_retry = retry_.with_delay() + assert retry_ is not new_retry + assert new_retry._initial == retry_._initial + assert new_retry._maximum == retry_._maximum + assert new_retry._multiplier == retry_._multiplier + + def test_with_delay(self): + retry_ = retry_async.AsyncRetry( + predicate=mock.sentinel.predicate, + initial=1, + maximum=2, + multiplier=3, + deadline=4, + on_error=mock.sentinel.on_error, + ) + new_retry = retry_.with_delay(initial=1, maximum=2, multiplier=3) + assert retry_ is not new_retry + assert new_retry._initial == 1 + assert new_retry._maximum == 2 + assert new_retry._multiplier == 3 + + # the rest of the attributes should remain the same + assert new_retry._deadline == retry_._deadline + assert new_retry._predicate is retry_._predicate + assert new_retry._on_error is retry_._on_error + + def test___str__(self): + def if_exception_type(exc): + return bool(exc) # pragma: NO COVER + + # Explicitly set all attributes as changed Retry defaults should not + # cause this test to start failing. + retry_ = retry_async.AsyncRetry( + predicate=if_exception_type, + initial=1.0, + maximum=60.0, + multiplier=2.0, + deadline=120.0, + on_error=None, + ) + assert re.match( + ( + r", " + r"initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, " + r"on_error=None>" + ), + str(retry_), + ) + + @mock.patch("asyncio.sleep", autospec=True) + @pytest.mark.asyncio + async def test___call___and_execute_success(self, sleep): + retry_ = retry_async.AsyncRetry() + target = mock.AsyncMock(spec=["__call__"], return_value=42) + # __name__ is needed by functools.partial. + target.__name__ = "target" + + decorated = retry_(target) + target.assert_not_called() + + result = await decorated("meep") + + assert result == 42 + target.assert_called_once_with("meep") + sleep.assert_not_called() + + # Make uniform return half of its maximum, which is the calculated sleep time. + @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0) + @mock.patch("asyncio.sleep", autospec=True) + @pytest.mark.asyncio + async def test___call___and_execute_retry(self, sleep, uniform): + + on_error = mock.Mock(spec=["__call__"], side_effect=[None]) + retry_ = retry_async.AsyncRetry(predicate=retry_async.if_exception_type(ValueError)) + + target = mock.AsyncMock(spec=["__call__"], side_effect=[ValueError(), 42]) + # __name__ is needed by functools.partial. + target.__name__ = "target" + + decorated = retry_(target, on_error=on_error) + target.assert_not_called() + + result = await decorated("meep") + + assert result == 42 + assert target.call_count == 2 + target.assert_has_calls([mock.call("meep"), mock.call("meep")]) + sleep.assert_called_once_with(retry_._initial) + assert on_error.call_count == 1 + + # Make uniform return half of its maximum, which is the calculated sleep time. + @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0) + @mock.patch("asyncio.sleep", autospec=True) + @pytest.mark.asyncio + async def test___call___and_execute_retry_hitting_deadline(self, sleep, uniform): + + on_error = mock.Mock(spec=["__call__"], side_effect=[None] * 10) + retry_ = retry_async.AsyncRetry( + predicate=retry_async.if_exception_type(ValueError), + initial=1.0, + maximum=1024.0, + multiplier=2.0, + deadline=9.9, + ) + + utcnow = datetime.datetime.utcnow() + utcnow_patcher = mock.patch( + "google.api_core.datetime_helpers.utcnow", return_value=utcnow + ) + + target = mock.AsyncMock(spec=["__call__"], side_effect=[ValueError()] * 10) + # __name__ is needed by functools.partial. + target.__name__ = "target" + + decorated = retry_(target, on_error=on_error) + target.assert_not_called() + + with utcnow_patcher as patched_utcnow: + # Make sure that calls to fake asyncio.sleep() also advance the mocked + # time clock. + def increase_time(sleep_delay): + patched_utcnow.return_value += datetime.timedelta(seconds=sleep_delay) + sleep.side_effect = increase_time + + with pytest.raises(exceptions.RetryError): + await decorated("meep") + + assert target.call_count == 5 + target.assert_has_calls([mock.call("meep")] * 5) + assert on_error.call_count == 5 + + # check the delays + assert sleep.call_count == 4 # once between each successive target calls + last_wait = sleep.call_args.args[0] + total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list) + + assert last_wait == 2.9 # and not 8.0, because the last delay was shortened + assert total_wait == 9.9 # the same as the deadline + + @mock.patch("asyncio.sleep", autospec=True) + @pytest.mark.asyncio + async def test___init___without_retry_executed(self, sleep): + _some_function = mock.Mock() + + retry_ = retry_async.AsyncRetry( + predicate=retry_async.if_exception_type(ValueError), on_error=_some_function + ) + # check the proper creation of the class + assert retry_._on_error is _some_function + + target = mock.AsyncMock(spec=["__call__"], side_effect=[42]) + # __name__ is needed by functools.partial. + target.__name__ = "target" + + wrapped = retry_(target) + + result = await wrapped("meep") + + assert result == 42 + target.assert_called_once_with("meep") + sleep.assert_not_called() + _some_function.assert_not_called() + + # Make uniform return half of its maximum, which is the calculated sleep time. + @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0) + @mock.patch("asyncio.sleep", autospec=True) + @pytest.mark.asyncio + async def test___init___when_retry_is_executed(self, sleep, uniform): + _some_function = mock.Mock() + + retry_ = retry_async.AsyncRetry( + predicate=retry_async.if_exception_type(ValueError), on_error=_some_function + ) + # check the proper creation of the class + assert retry_._on_error is _some_function + + target = mock.AsyncMock( + spec=["__call__"], side_effect=[ValueError(), ValueError(), 42] + ) + # __name__ is needed by functools.partial. + target.__name__ = "target" + + wrapped = retry_(target) + target.assert_not_called() + + result = await wrapped("meep") + + assert result == 42 + assert target.call_count == 3 + assert _some_function.call_count == 2 + target.assert_has_calls([mock.call("meep"), mock.call("meep")]) + sleep.assert_any_call(retry_._initial)