Skip to content

Commit

Permalink
feat: media operation retries can be configured using the same interf…
Browse files Browse the repository at this point in the history
…ace as with non-media operation (#447)

All media operation calls (downloads and uploads) can be configured with Retry objects and ConditionalRetryPolicy objects, nearly identically to non-media operations. This is accomplished by converting the Retry object to a google-resumable-media-python library RetryStrategy object at the point of entry to that library.

Custom predicates of Retry objects (for instance set with Retry(predicate=...)) are not supported for media operations; they will be replaced with a media-operation-specific predicate.

This change is backwards-compatible for users of public methods using num_retries arguments to configure uploads; num_retries continue to be supported but the deprecation warning remains in effect. They will be fully removed and replaced with Retry objects in the future.

With this change, the default parameters for a media operations retry changes to be uniform with non-media operation retries. Specifically, the retry deadline for media operation retries becomes 120 seconds unless otherwise configured.
  • Loading branch information
andrewsg committed Jun 11, 2021
1 parent 78b2eba commit 0dbbb8a
Show file tree
Hide file tree
Showing 9 changed files with 1,127 additions and 100 deletions.
41 changes: 41 additions & 0 deletions google/cloud/storage/_helpers.py
Expand Up @@ -23,6 +23,7 @@
import os

from six.moves.urllib.parse import urlsplit
from google import resumable_media
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
Expand All @@ -45,6 +46,12 @@
("if_source_metageneration_not_match", "ifSourceMetagenerationNotMatch"),
)

_NUM_RETRIES_MESSAGE = (
"`num_retries` has been deprecated and will be removed in a future "
"release. Use the `retry` argument with a Retry or ConditionalRetryPolicy "
"object, or None, instead."
)


def _get_storage_host():
return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST)
Expand Down Expand Up @@ -524,3 +531,37 @@ def _bucket_bound_hostname_url(host, scheme=None):
return host

return "{scheme}://{host}/".format(scheme=scheme, host=host)


def _api_core_retry_to_resumable_media_retry(retry, num_retries=None):
"""Convert google.api.core.Retry to google.resumable_media.RetryStrategy.
Custom predicates are not translated.
:type retry: google.api_core.Retry
:param retry: (Optional) The google.api_core.Retry object to translate.
:type num_retries: int
:param num_retries: (Optional) The number of retries desired. This is
supported for backwards compatibility and is mutually exclusive with
`retry`.
:rtype: google.resumable_media.RetryStrategy
:returns: A RetryStrategy with all applicable attributes copied from input,
or a RetryStrategy with max_retries set to 0 if None was input.
"""

if retry is not None and num_retries is not None:
raise ValueError("num_retries and retry arguments are mutually exclusive")

elif retry is not None:
return resumable_media.RetryStrategy(
max_sleep=retry._maximum,
max_cumulative_retry=retry._deadline,
initial_delay=retry._initial,
multiplier=retry._multiplier,
)
elif num_retries is not None:
return resumable_media.RetryStrategy(max_retries=num_retries)
else:
return resumable_media.RetryStrategy(max_retries=0)
350 changes: 320 additions & 30 deletions google/cloud/storage/blob.py

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions google/cloud/storage/client.py
Expand Up @@ -53,6 +53,7 @@
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import ConditionalRetryPolicy


_marker = object()
Expand Down Expand Up @@ -972,6 +973,7 @@ def download_blob_to_file(
if_metageneration_not_match=None,
timeout=_DEFAULT_TIMEOUT,
checksum="md5",
retry=DEFAULT_RETRY,
):
"""Download the contents of a blob object or blob URI into a file-like object.
Expand Down Expand Up @@ -1021,6 +1023,27 @@ def download_blob_to_file(
downloads where chunk_size is set) an INFO-level log will be
emitted. Supported values are "md5", "crc32c" and None. The default
is "md5".
retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy)
(Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.
A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.
Examples:
Download a blob using a blob resource.
Expand All @@ -1046,6 +1069,19 @@ def download_blob_to_file(
"""

# Handle ConditionalRetryPolicy.
if isinstance(retry, ConditionalRetryPolicy):
# Conditional retries are designed for non-media calls, which change
# arguments into query_params dictionaries. Media operations work
# differently, so here we make a "fake" query_params to feed to the
# ConditionalRetryPolicy.
query_params = {
"ifGenerationMatch": if_generation_match,
"ifMetagenerationMatch": if_metageneration_match,
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)

if not isinstance(blob_or_uri, Blob):
blob_or_uri = Blob.from_string(blob_or_uri)
download_url = blob_or_uri._get_download_url(
Expand All @@ -1070,6 +1106,7 @@ def download_blob_to_file(
raw_download,
timeout=timeout,
checksum=checksum,
retry=retry,
)
except resumable_media.InvalidResponse as exc:
_raise_from_invalid_response(exc)
Expand Down Expand Up @@ -1222,6 +1259,8 @@ def list_blobs(
max_results=max_results,
extra_params=extra_params,
page_start=_blobs_page_start,
timeout=timeout,
retry=retry,
)
iterator.bucket = bucket
iterator.prefixes = set()
Expand Down
103 changes: 89 additions & 14 deletions google/cloud/storage/fileio.py
Expand Up @@ -13,8 +13,14 @@
# limitations under the License.

import io
import warnings

from google.api_core.exceptions import RequestRangeNotSatisfiable
from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED
from google.cloud.storage.retry import ConditionalRetryPolicy


# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
Expand All @@ -28,20 +34,22 @@
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"retry",
}

# Valid keyword arguments for upload methods.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_UPLOAD_KWARGS = {
"content_type",
"num_retries",
"predefined_acl",
"num_retries",
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"checksum",
"retry",
}


Expand All @@ -58,13 +66,35 @@ class BlobReader(io.BufferedIOBase):
bytes than the chunk_size are requested, the remainder is buffered.
The default is the chunk_size of the blob, or 40MiB.
:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry: (Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.
A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.
:param download_kwargs: Keyword arguments to pass to the underlying API
calls. The following arguments are supported: "if_generation_match",
"if_generation_not_match", "if_metageneration_match",
"if_metageneration_not_match", "timeout".
"""

def __init__(self, blob, chunk_size=None, **download_kwargs):
def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs):
"""docstring note that download_kwargs also used for reload()"""
for kwarg in download_kwargs:
if kwarg not in VALID_DOWNLOAD_KWARGS:
Expand All @@ -76,6 +106,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs):
self._pos = 0
self._buffer = io.BytesIO()
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
self._retry = retry
self._download_kwargs = download_kwargs

def read(self, size=-1):
Expand All @@ -102,6 +133,7 @@ def read(self, size=-1):
start=fetch_start,
end=fetch_end,
checksum=None,
retry=self._retry,
**self._download_kwargs
)
except RequestRangeNotSatisfiable:
Expand Down Expand Up @@ -197,14 +229,43 @@ class BlobWriter(io.BufferedIOBase):
changes the behavior of flush() to conform to TextIOWrapper's
expectations.
:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry: (Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.
A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.
:param upload_kwargs: Keyword arguments to pass to the underlying API
calls. The following arguments are supported: "if_generation_match",
"if_generation_not_match", "if_metageneration_match",
"if_metageneration_not_match", "timeout", "content_type",
"num_retries", "predefined_acl", "checksum".
"""

def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
def __init__(
self,
blob,
chunk_size=None,
text_mode=False,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
**upload_kwargs
):
for kwarg in upload_kwargs:
if kwarg not in VALID_UPLOAD_KWARGS:
raise ValueError(
Expand All @@ -219,6 +280,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs):
# In text mode this class will be wrapped and TextIOWrapper requires a
# different behavior of flush().
self._text_mode = text_mode
self._retry = retry
self._upload_kwargs = upload_kwargs

@property
Expand Down Expand Up @@ -259,20 +321,32 @@ def write(self, b):
return pos

def _initiate_upload(self):
# num_retries is only supported for backwards-compatibility reasons.
num_retries = self._upload_kwargs.pop("num_retries", None)
retry = self._retry
content_type = self._upload_kwargs.pop("content_type", None)

if (
self._upload_kwargs.get("if_metageneration_match") is None
and num_retries is None
):
# Uploads are only idempotent (safe to retry) if
# if_metageneration_match is set. If it is not set, the default
# num_retries should be 0. Note: Because retry logic for uploads is
# provided by the google-resumable-media-python package, it doesn't
# use the ConditionalRetryStrategy class used in other API calls in
# this library to solve this problem.
num_retries = 0
if num_retries is not None:
warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2)
# num_retries and retry are mutually exclusive. If num_retries is
# set and retry is exactly the default, then nullify retry for
# backwards compatibility.
if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED:
retry = None

# Handle ConditionalRetryPolicy.
if isinstance(retry, ConditionalRetryPolicy):
# Conditional retries are designed for non-media calls, which change
# arguments into query_params dictionaries. Media operations work
# differently, so here we make a "fake" query_params to feed to the
# ConditionalRetryPolicy.
query_params = {
"ifGenerationMatch": self._upload_kwargs.get("if_generation_match"),
"ifMetagenerationMatch": self._upload_kwargs.get(
"if_metageneration_match"
),
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)

self._upload_and_transport = self._blob._initiate_resumable_upload(
self._blob.bucket.client,
Expand All @@ -281,6 +355,7 @@ def _initiate_upload(self):
None,
num_retries,
chunk_size=self._chunk_size,
retry=retry,
**self._upload_kwargs
)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -30,7 +30,7 @@
dependencies = [
"google-auth >= 1.11.0, < 2.0dev",
"google-cloud-core >= 1.4.1, < 2.0dev",
"google-resumable-media >= 1.2.0, < 2.0dev",
"google-resumable-media >= 1.3.0, < 2.0dev",
"requests >= 2.18.0, < 3.0.0dev",
"googleapis-common-protos < 1.53.0; python_version<'3.0'",
]
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test__helpers.py
Expand Up @@ -593,6 +593,45 @@ def test_hostname_and_scheme(self):
self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL)


class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase):
def test_conflict(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

with self.assertRaises(ValueError):
_api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2)

def test_retry(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY)
self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum)
self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline)
self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial)
self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier)

def test_num_retries(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(
retry=None, num_retries=2
)
self.assertEqual(retry_strategy.max_retries, 2)

def test_none(self):
from google.cloud.storage._helpers import (
_api_core_retry_to_resumable_media_retry,
)

retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None)
self.assertEqual(retry_strategy.max_retries, 0)


class _MD5Hash(object):
def __init__(self, digest_val):
self.digest_val = digest_val
Expand Down

0 comments on commit 0dbbb8a

Please sign in to comment.