Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: media operation retries can be configured using the same interface as with non-media operation #447

Merged
merged 9 commits into from Jun 11, 2021
41 changes: 41 additions & 0 deletions google/cloud/storage/_helpers.py
Expand Up @@ -23,6 +23,7 @@
import os

from six.moves.urllib.parse import urlsplit
from google import resumable_media
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
Expand All @@ -45,6 +46,12 @@
("if_source_metageneration_not_match", "ifSourceMetagenerationNotMatch"),
)

_NUM_RETRIES_MESSAGE = (
"`num_retries` has been deprecated and will be removed in a future "
"release. Use the `retry` argument with a Retry or ConditionalRetryPolicy "
"object, or None, instead."
)


def _get_storage_host():
return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST)
Expand Down Expand Up @@ -524,3 +531,37 @@ def _bucket_bound_hostname_url(host, scheme=None):
return host

return "{scheme}://{host}/".format(scheme=scheme, host=host)


def _api_core_retry_to_resumable_media_retry(retry, num_retries=None):
"""Convert google.api.core.Retry to google.resumable_media.RetryStrategy.

Custom predicates are not translated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this clear to the end user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have this warning in every single public method docstring, so with that it should be clear.


:type retry: google.api_core.Retry
:param retry: (Optional) The google.api_core.Retry object to translate.

:type num_retries: int
:param num_retries: (Optional) The number of retries desired. This is
supported for backwards compatibility and is mutually exclusive with
`retry`.

:rtype: google.resumable_media.RetryStrategy
:returns: A RetryStrategy with all applicable attributes copied from input,
or a RetryStrategy with max_retries set to 0 if None was input.
"""

if retry is not None and num_retries is not None:
raise ValueError("num_retries and retry arguments are mutually exclusive")

elif retry is not None:
return resumable_media.RetryStrategy(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like error types are not translatable in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean custom predicates? Yes, those are not translatable. It'll be documented wherever possible.

max_sleep=retry._maximum,
max_cumulative_retry=retry._deadline,
initial_delay=retry._initial,
multiplier=retry._multiplier,
)
elif num_retries is not None:
return resumable_media.RetryStrategy(max_retries=num_retries)
else:
return resumable_media.RetryStrategy(max_retries=0)
350 changes: 320 additions & 30 deletions google/cloud/storage/blob.py

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions google/cloud/storage/client.py
Expand Up @@ -53,6 +53,7 @@
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import ConditionalRetryPolicy


_marker = object()
Expand Down Expand Up @@ -972,6 +973,7 @@ def download_blob_to_file(
if_metageneration_not_match=None,
timeout=_DEFAULT_TIMEOUT,
checksum="md5",
retry=DEFAULT_RETRY,
):
"""Download the contents of a blob object or blob URI into a file-like object.

Expand Down Expand Up @@ -1021,6 +1023,27 @@ def download_blob_to_file(
downloads where chunk_size is set) an INFO-level log will be
emitted. Supported values are "md5", "crc32c" and None. The default
is "md5".
retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy)
(Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.

A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.

Examples:
Download a blob using a blob resource.
Expand All @@ -1046,6 +1069,19 @@ def download_blob_to_file(


"""

# Handle ConditionalRetryPolicy.
if isinstance(retry, ConditionalRetryPolicy):
# Conditional retries are designed for non-media calls, which change
# arguments into query_params dictionaries. Media operations work
# differently, so here we make a "fake" query_params to feed to the
# ConditionalRetryPolicy.
query_params = {
"ifGenerationMatch": if_generation_match,
"ifMetagenerationMatch": if_metageneration_match,
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)

if not isinstance(blob_or_uri, Blob):
blob_or_uri = Blob.from_string(blob_or_uri)
download_url = blob_or_uri._get_download_url(
Expand All @@ -1070,6 +1106,7 @@ def download_blob_to_file(
raw_download,
timeout=timeout,
checksum=checksum,
retry=retry,
)
except resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)
Expand Down Expand Up @@ -1222,6 +1259,7 @@ def list_blobs(
max_results=max_results,
extra_params=extra_params,
page_start=_blobs_page_start,
timeout=timeout,
tseaver marked this conversation as resolved.
Show resolved Hide resolved
)
iterator.bucket = bucket
iterator.prefixes = set()
Expand Down
103 changes: 89 additions & 14 deletions google/cloud/storage/fileio.py
Expand Up @@ -13,8 +13,14 @@
# limitations under the License.

import io
import warnings

from google.api_core.exceptions import RequestRangeNotSatisfiable
from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
from google.cloud.storage.retry import ConditionalRetryPolicy


# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
Expand All @@ -28,20 +34,22 @@
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"retry",
}

# Valid keyword arguments for upload methods.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_UPLOAD_KWARGS = {
"content_type",
"num_retries",
"predefined_acl",
"num_retries",
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"checksum",
"retry",
}


Expand All @@ -58,13 +66,35 @@ class BlobReader(io.BufferedIOBase):
bytes than the chunk_size are requested, the remainder is buffered.
The default is the chunk_size of the blob, or 40MiB.

:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry: (Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.

A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.

:param download_kwargs: Keyword arguments to pass to the underlying API
calls. The following arguments are supported: "if_generation_match",
"if_generation_not_match", "if_metageneration_match",
"if_metageneration_not_match", "timeout".
"""

def __init__(self, blob, chunk_size=None, **download_kwargs):
def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs):
"""docstring note that download_kwargs also used for reload()"""
for kwarg in download_kwargs:
if kwarg not in VALID_DOWNLOAD_KWARGS:
Expand All @@ -76,6 +106,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs):
self._pos = 0
self._buffer = io.BytesIO()
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
self._retry = retry
self._download_kwargs = download_kwargs

def read(self, size=-1):
Expand All @@ -102,6 +133,7 @@ def read(self, size=-1):
start=fetch_start,
end=fetch_end,
checksum=None,
retry=self._retry,
**self._download_kwargs
)
except RequestRangeNotSatisfiable:
Expand Down Expand Up @@ -197,14 +229,43 @@ class BlobWriter(io.BufferedIOBase):
changes the behavior of flush() to conform to TextIOWrapper's
expectations.

:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry: (Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.

A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.

See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.

Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.

:param upload_kwargs: Keyword arguments to pass to the underlying API
calls. The following arguments are supported: "if_generation_match",
"if_generation_not_match", "if_metageneration_match",
"if_metageneration_not_match", "timeout", "content_type",
"num_retries", "predefined_acl", "checksum".
"""

def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
def __init__(
self,
blob,
chunk_size=None,
text_mode=False,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
**upload_kwargs
):
for kwarg in upload_kwargs:
if kwarg not in VALID_UPLOAD_KWARGS:
raise ValueError(
Expand All @@ -219,6 +280,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
# In text mode this class will be wrapped and TextIOWrapper requires a
# different behavior of flush().
self._text_mode = text_mode
self._retry = retry
self._upload_kwargs = upload_kwargs

@property
Expand Down Expand Up @@ -259,20 +321,32 @@ def write(self, b):
return pos

def _initiate_upload(self):
# num_retries is only supported for backwards-compatibility reasons.
num_retries = self._upload_kwargs.pop("num_retries", None)
retry = self._retry
content_type = self._upload_kwargs.pop("content_type", None)

if (
self._upload_kwargs.get("if_metageneration_match") is None
and num_retries is None
):
# Uploads are only idempotent (safe to retry) if
# if_metageneration_match is set. If it is not set, the default
# num_retries should be 0. Note: Because retry logic for uploads is
# provided by the google-resumable-media-python package, it doesn't
# use the ConditionalRetryStrategy class used in other API calls in
# this library to solve this problem.
num_retries = 0
if num_retries is not None:
warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2)
# num_retries and retry are mutually exclusive. If num_retries is
# set and retry is exactly the default, then nullify retry for
# backwards compatibility.
if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED:
retry = None

# Handle ConditionalRetryPolicy.
if isinstance(retry, ConditionalRetryPolicy):
# Conditional retries are designed for non-media calls, which change
# arguments into query_params dictionaries. Media operations work
# differently, so here we make a "fake" query_params to feed to the
# ConditionalRetryPolicy.
query_params = {
"ifGenerationMatch": self._upload_kwargs.get("if_generation_match"),
"ifMetagenerationMatch": self._upload_kwargs.get(
"if_metageneration_match"
),
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)

self._upload_and_transport = self._blob._initiate_resumable_upload(
self._blob.bucket.client,
Expand All @@ -281,6 +355,7 @@ def _initiate_upload(self):
None,
num_retries,
chunk_size=self._chunk_size,
retry=retry,
**self._upload_kwargs
)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -30,7 +30,7 @@
dependencies = [
"google-auth >= 1.11.0, < 2.0dev",
"google-cloud-core >= 1.4.1, < 2.0dev",
"google-resumable-media >= 1.2.0, < 2.0dev",
"google-resumable-media >= 1.3.0, < 2.0dev",
"requests >= 2.18.0, < 3.0.0dev",
"googleapis-common-protos < 1.53.0; python_version<'3.0'",
]
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test__helpers.py
Expand Up @@ -593,6 +593,45 @@ def test_hostname_and_scheme(self):
self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL)


class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase):
def test_conflict(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

with self.assertRaises(ValueError):
_api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2)

def test_retry(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY)
self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum)
self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline)
self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial)
self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier)

def test_num_retries(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(
retry=None, num_retries=2
)
self.assertEqual(retry_strategy.max_retries, 2)

def test_none(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None)
self.assertEqual(retry_strategy.max_retries, 0)


class _MD5Hash(object):
def __init__(self, digest_val):
self.digest_val = digest_val
Expand Down