From 9a0eede42f434b3d7452b7af7e885ae581f7dbab Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 30 Mar 2021 11:25:11 -0700 Subject: [PATCH 1/8] fix: Ensure consistency check in test runs even if expected error occurs --- tests/unit/test_fileio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 8056c045f..0ac16ab24 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -388,7 +388,7 @@ def test_seek(self): with self.assertRaises(ValueError): pos = buff.tell() buff.seek(len(TEST_BINARY_DATA) + 1) - self.assertEqual(pos, buff.tell()) + self.assertEqual(pos, buff.tell()) # Read 8 bytes, test seek backwards, read again, and flush. self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) @@ -401,7 +401,7 @@ def test_seek(self): with self.assertRaises(ValueError): pos = buff.tell() buff.seek(0) - self.assertEqual(pos, buff.tell()) + self.assertEqual(pos, buff.tell()) def test_close(self): buff = SlidingBuffer() From f43f63046149371ac2d4488cdc9a44f666549de3 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 12 May 2021 13:27:11 -0700 Subject: [PATCH 2/8] tests pass --- google/cloud/storage/_helpers.py | 40 +++++++++++ google/cloud/storage/blob.py | 112 ++++++++++++++++++++----------- google/cloud/storage/client.py | 12 ++++ google/cloud/storage/fileio.py | 56 +++++++++++----- tests/unit/test_blob.py | 71 +++++++++++--------- 5 files changed, 206 insertions(+), 85 deletions(-) diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 338b79861..911eeb5d2 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -23,6 +23,7 @@ import os from six.moves.urllib.parse import urlsplit +from google import resumable_media from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED @@ -45,6 +46,11 @@ ("if_source_metageneration_not_match", "ifSourceMetagenerationNotMatch"), ) +_NUM_RETRIES_MESSAGE = ( + "`num_retries` has been deprecated and will be removed in a future " + "release. Use the `retry` argument with a Retry or ConditionalRetryPolicy " + "object, or None, instead." +) def _get_storage_host(): return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST) @@ -563,3 +569,37 @@ def _bucket_bound_hostname_url(host, scheme=None): return host return "{scheme}://{host}/".format(scheme=scheme, host=host) + + +def _api_core_retry_to_resumable_media_retry(retry): + """Convert google.api.core.Retry to google.resumable_media.RetryStrategy. + + Custom predicates are not translated. + + :type retry: google.api_core.Retry + :param retry: (Optional) The google.api_core.Retry object to translate. + + :rtype: google.resumable_media.RetryStrategy + :returns: A RetryStrategy with all applicable attributes copied from input, + or a RetryStrategy with max_retries set to 0 if None was input. + """ + + if retry is not None: + return resumable_media.RetryStrategy(max_sleep=retry._maximum, max_cumulative_retry=retry._deadline, initial_delay=retry._initial, multiplier=retry._multiplier) + else: + return resumable_media.RetryStrategy(max_retries=0) + +def _retry_from_num_retries(num_retries): + """Convert num_retries into a Retry object. + + Retry objects have deadlines but not a maximum number of retries. This + function chooses a deadline that will approximate the requested number of + retries. num_retries is deprecated and this approximates previous behavior + on a best-effort basis. + + :type num_retries: int + :param num_retries: The number of retries desired. + """ + + deadline = DEFAULT_RETRY.initial * ((DEFAULT_RETRY.multiplier ** (num_retries+1)) - DEFAULT_RETRY.multiplier) + return DEFAULT_RETRY.with_deadline(deadline) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 04212ce24..1ca51040d 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -65,8 +65,11 @@ from google.cloud.storage._helpers import _bucket_bound_hostname_url from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud.storage._helpers import _raise_if_more_than_one_set +from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry +from google.cloud.storage._helpers import _retry_from_num_retries from google.cloud.storage._signing import generate_signed_url_v2 from google.cloud.storage._signing import generate_signed_url_v4 +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.acl import ACL from google.cloud.storage.acl import ObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -76,9 +79,11 @@ from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS from google.cloud.storage.constants import STANDARD_STORAGE_CLASS +from google.cloud.storage.retry import ConditionalRetryPolicy from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED from google.cloud.storage.fileio import BlobReader from google.cloud.storage.fileio import BlobWriter @@ -105,17 +110,6 @@ "name", "storageClass", ) -_NUM_RETRIES_MESSAGE = ( - "`num_retries` has been deprecated and will be removed in a future " - "release. The default behavior (when `num_retries` is not specified) when " - "a transient error (e.g. 429 Too Many Requests or 500 Internal Server " - "Error) occurs will be as follows: upload requests will be automatically " - "retried if and only if `if_metageneration_match` is specified (thus " - "making the upload idempotent). Subsequent retries will be sent after " - "waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until 10 minutes " - "of wait time have elapsed. At that point, there will be no more attempts " - "to retry." -) _READ_LESS_THAN_SIZE = ( "Size {:d} was specified but the file-like object only had " "{:d} bytes remaining." ) @@ -894,6 +888,7 @@ def _do_download( raw_download=False, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Perform a download without any error handling. @@ -943,7 +938,23 @@ def _do_download( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. """ + if self.chunk_size is None: if raw_download: klass = RawDownload @@ -958,6 +969,7 @@ def _do_download( end=end, checksum=checksum, ) + download._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: @@ -980,6 +992,7 @@ def _do_download( end=end, ) + download._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) while not download.finished: download.consume_next_chunk(transport, timeout=timeout) @@ -996,6 +1009,7 @@ def download_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """DEPRECATED. Download the contents of this blob into a file-like object. @@ -1101,6 +1115,7 @@ def download_to_file( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def download_to_filename( @@ -1116,6 +1131,7 @@ def download_to_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob into a named file. @@ -1198,6 +1214,7 @@ def download_to_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.DataCorruption: # Delete the corrupt downloaded file. @@ -1224,6 +1241,7 @@ def download_as_bytes( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of this blob as a bytes object. @@ -1305,6 +1323,7 @@ def download_as_bytes( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return string_buffer.getvalue() @@ -1319,6 +1338,7 @@ def download_as_string( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """(Deprecated) Download the contents of this blob as a bytes object. @@ -1394,6 +1414,7 @@ def download_as_string( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) def download_as_text( @@ -1408,6 +1429,7 @@ def download_as_text( if_metageneration_match=None, if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ): """Download the contents of this blob as text (*not* bytes). @@ -1478,6 +1500,7 @@ def download_as_text( if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, + retry=retry, ) if encoding is not None: @@ -1582,7 +1605,7 @@ def _do_multipart_upload( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -1729,10 +1752,7 @@ def _do_multipart_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) response = upload.transmit( transport, data, object_metadata, content_type, timeout=timeout @@ -1746,7 +1766,7 @@ def _initiate_resumable_upload( stream, content_type, size, - num_retries, + retry, predefined_acl=None, extra_headers=None, chunk_size=None, @@ -1913,10 +1933,7 @@ def _initiate_resumable_upload( upload_url, chunk_size, headers=headers, checksum=checksum ) - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries - ) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) upload.initiate( transport, @@ -1936,7 +1953,7 @@ def _do_resumable_upload( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2036,7 +2053,7 @@ def _do_resumable_upload( stream, content_type, size, - num_retries, + retry, predefined_acl=predefined_acl, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, @@ -2062,7 +2079,7 @@ def _do_upload( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2162,14 +2179,15 @@ def _do_upload( **only** response in the multipart case and it will be the **final** response in the resumable case. """ - if if_metageneration_match is None and num_retries is None: - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = {"ifGenerationMatch": if_generation_match, "ifMetagenerationMatch": if_metageneration_match} + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) if size is not None and size <= _MAX_MULTIPART_SIZE: response = self._do_multipart_upload( @@ -2177,7 +2195,7 @@ def _do_upload( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2192,7 +2210,7 @@ def _do_upload( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2219,6 +2237,7 @@ def upload_from_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload the contents of this blob from a file-like object. @@ -2345,6 +2364,16 @@ def upload_from_file( """ if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # Convert num_retries into a Retry object. Retry objects don't have + # a maximum number of retries, just a deadline in seconds, so we + # attempt to convert num_retries into a deadline sufficient to do + # that number of retries and no more. + if retry is not None: + raise ValueError("num_retries and retry arguments are mutually exclusive") + elif num_retries < 1: + retry = None + else: + retry = _retry_from_num_retries(num_retries) _maybe_rewind(file_obj, rewind=rewind) predefined_acl = ACL.validate_predefined(predefined_acl) @@ -2355,7 +2384,7 @@ def upload_from_file( file_obj, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2381,6 +2410,7 @@ def upload_from_filename( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload this blob's contents from the content of a named file. @@ -2490,6 +2520,7 @@ def upload_from_filename( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def upload_from_string( @@ -2505,6 +2536,7 @@ def upload_from_string( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, ): """Upload contents of this blob from the provided string. @@ -2608,6 +2640,7 @@ def upload_from_string( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) def create_resumable_upload_session( @@ -3483,9 +3516,12 @@ def open( :param kwargs: Keyword arguments to pass to the underlying API calls. For both uploads and downloads, the following arguments are supported: "if_generation_match", "if_generation_not_match", - "if_metageneration_match", "if_metageneration_not_match", "timeout". - For uploads only, the following additional arguments are supported: - "content_type", "num_retries", "predefined_acl", "checksum". + "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry". For uploads only, the following additional arguments are + supported: "content_type", "num_retries", "predefined_acl", + "checksum". "num_retries" is supported for backwards-compatibility + reasons only; please use "retry" with a Retry object or + ConditionalRetryPolicy instead. :returns: A 'BlobReader' or 'BlobWriter' from 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 858fecdce..d20489134 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -53,6 +53,7 @@ from google.cloud.storage.acl import DefaultObjectACL from google.cloud.storage.constants import _DEFAULT_TIMEOUT from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import ConditionalRetryPolicy _marker = object() @@ -628,6 +629,7 @@ def download_blob_to_file( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum="md5", + retry=DEFAULT_RETRY, ): """Download the contents of a blob object or blob URI into a file-like object. @@ -702,6 +704,16 @@ def download_blob_to_file( """ + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = {"ifGenerationMatch": if_generation_match, "ifMetagenerationMatch": if_metageneration_match} + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) + if not isinstance(blob_or_uri, Blob): blob_or_uri = Blob.from_string(blob_or_uri) download_url = blob_or_uri._get_download_url( diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 53d3d14ab..d2d57126a 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -15,6 +15,11 @@ import io from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE +from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED +from google.cloud.storage.retry import ConditionalRetryPolicy + # Resumable uploads require a chunk size of precisely a multiple of 256 KiB. CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB @@ -28,20 +33,22 @@ "if_metageneration_match", "if_metageneration_not_match", "timeout", + "retry", } # Valid keyword arguments for upload methods. # Note: Changes here need to be reflected in the blob.open() docstring. VALID_UPLOAD_KWARGS = { "content_type", - "num_retries", "predefined_acl", + "num_retries", "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout", "checksum", + "retry", } @@ -58,13 +65,15 @@ class BlobReader(io.BufferedIOBase): bytes than the chunk_size are requested, the remainder is buffered. The default is the chunk_size of the blob, or 40MiB. + + :param download_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", "if_metageneration_not_match", "timeout". """ - def __init__(self, blob, chunk_size=None, **download_kwargs): + def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs): """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: if kwarg not in VALID_DOWNLOAD_KWARGS: @@ -76,6 +85,7 @@ def __init__(self, blob, chunk_size=None, **download_kwargs): self._pos = 0 self._buffer = io.BytesIO() self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + self._retry=retry self._download_kwargs = download_kwargs def read(self, size=-1): @@ -102,6 +112,7 @@ def read(self, size=-1): start=fetch_start, end=fetch_end, checksum=None, + retry=self._retry, **self._download_kwargs ) except RequestRangeNotSatisfiable: @@ -204,7 +215,7 @@ class BlobWriter(io.BufferedIOBase): "num_retries", "predefined_acl", "checksum". """ - def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): + def __init__(self, blob, chunk_size=None, text_mode=False, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, **upload_kwargs): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: raise ValueError( @@ -219,6 +230,7 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): # In text mode this class will be wrapped and TextIOWrapper requires a # different behavior of flush(). self._text_mode = text_mode + self._retry = retry self._upload_kwargs = upload_kwargs @property @@ -259,27 +271,41 @@ def write(self, b): return pos def _initiate_upload(self): + # num_retries is only supported for backwards-compatibility reasons. num_retries = self._upload_kwargs.pop("num_retries", None) + retry = self._retry content_type = self._upload_kwargs.pop("content_type", None) - if ( - self._upload_kwargs.get("if_metageneration_match") is None - and num_retries is None - ): - # Uploads are only idempotent (safe to retry) if - # if_metageneration_match is set. If it is not set, the default - # num_retries should be 0. Note: Because retry logic for uploads is - # provided by the google-resumable-media-python package, it doesn't - # use the ConditionalRetryStrategy class used in other API calls in - # this library to solve this problem. - num_retries = 0 + # Handle num_retries backwards-compatibility. + if num_retries is not None: + warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) + # Convert num_retries into a Retry object. Retry objects don't have + # a maximum number of retries, just a deadline in seconds, so we + # attempt to convert num_retries into a deadline sufficient to do + # that number of retries and no more. + if retry is not None: + raise ValueError("num_retries and retry arguments are mutually exclusive") + elif num_retries < 1: + retry = None + else: + deadline = DEFAULT_RETRY.initial * ((DEFAULT_RETRY.multiplier ** (num_retries+1)) - DEFAULT_RETRY.multiplier) + retry = DEFAULT_RETRY.with_deadline(deadline) + + # Handle ConditionalRetryPolicy. + if isinstance(retry, ConditionalRetryPolicy): + # Conditional retries are designed for non-media calls, which change + # arguments into query_params dictionaries. Media operations work + # differently, so here we make a "fake" query_params to feed to the + # ConditionalRetryPolicy. + query_params = {"ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), "ifMetagenerationMatch": self._upload_kwargs.get("if_metageneration_match")} + retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) self._upload_and_transport = self._blob._initiate_resumable_upload( self._blob.bucket.client, self._buffer, content_type, None, - num_retries, + retry, chunk_size=self._chunk_size, **self._upload_kwargs ) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index e8573ce21..f08c677c5 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -28,6 +28,7 @@ from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED def _make_credentials(): @@ -1666,6 +1667,7 @@ def test_download_as_bytes_w_generation_match(self): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_as_bytes_wo_raw(self): @@ -1757,6 +1759,7 @@ def _download_as_text_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=DEFAULT_RETRY, ) def test_download_as_text_wo_raw(self): @@ -1849,6 +1852,7 @@ def test_download_as_string(self, mock_warn): if_metageneration_not_match=None, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) mock_warn.assert_called_with( @@ -1967,7 +1971,6 @@ def _do_multipart_success( mock_get_boundary, client=None, size=None, - num_retries=None, user_project=None, predefined_acl=None, if_generation_match=None, @@ -1978,6 +1981,7 @@ def _do_multipart_success( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode @@ -2020,7 +2024,7 @@ def _do_multipart_success( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2135,7 +2139,7 @@ def test__do_multipart_upload_with_kms_with_version(self, mock_get_boundary): @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_retry(self, mock_get_boundary): - self._do_multipart_success(mock_get_boundary, num_retries=8) + self._do_multipart_success(mock_get_boundary, retry=DEFAULT_RETRY) @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_generation_match(self, mock_get_boundary): @@ -2187,7 +2191,7 @@ def _initiate_resumable_helper( size=None, extra_headers=None, chunk_size=None, - num_retries=None, + retry=None, user_project=None, predefined_acl=None, if_generation_match=None, @@ -2261,7 +2265,7 @@ def _initiate_resumable_helper( stream, content_type, size, - num_retries, + retry, extra_headers=extra_headers, chunk_size=chunk_size, predefined_acl=predefined_acl, @@ -2334,13 +2338,12 @@ def _initiate_resumable_helper( self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) retry_strategy = upload._retry_strategy - self.assertEqual(retry_strategy.max_sleep, 64.0) - if num_retries is None: - self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) - self.assertIsNone(retry_strategy.max_retries) + if retry is None: + self.assertEqual(retry_strategy.max_retries, 0) else: - self.assertIsNone(retry_strategy.max_cumulative_retry) - self.assertEqual(retry_strategy.max_retries, num_retries) + self.assertEqual(retry_strategy.max_sleep, 60.0) + self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) + self.assertIsNone(retry_strategy.max_retries) self.assertIs(client._http, transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) @@ -2417,7 +2420,7 @@ def test__initiate_resumable_upload_with_extra_headers(self): self._initiate_resumable_helper(extra_headers=extra_headers) def test__initiate_resumable_upload_with_retry(self): - self._initiate_resumable_helper(num_retries=11) + self._initiate_resumable_helper(retry=DEFAULT_RETRY) def test__initiate_resumable_upload_with_generation_match(self): self._initiate_resumable_helper( @@ -2562,7 +2565,7 @@ def _do_resumable_upload_call2( def _do_resumable_helper( self, use_size=False, - num_retries=None, + retry=None, predefined_acl=None, if_generation_match=None, if_generation_not_match=None, @@ -2610,7 +2613,7 @@ def _do_resumable_helper( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2673,7 +2676,7 @@ def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) def test__do_resumable_upload_with_retry(self): - self._do_resumable_helper(num_retries=6) + self._do_resumable_helper(retry=DEFAULT_RETRY) def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") @@ -2691,7 +2694,7 @@ def test__do_resumable_upload_with_data_corruption(self): def _do_upload_helper( self, chunk_size=None, - num_retries=None, + retry=None, predefined_acl=None, if_generation_match=None, if_generation_not_match=None, @@ -2736,7 +2739,7 @@ def _do_upload_helper( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2745,11 +2748,6 @@ def _do_upload_helper( **timeout_kwarg ) - # Adjust num_retries expectations to reflect the conditional default in - # _do_upload() - if num_retries is None and if_metageneration_match is None: - num_retries = 0 - self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() if size is not None and size <= _MAX_MULTIPART_SIZE: @@ -2758,7 +2756,7 @@ def _do_upload_helper( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2775,7 +2773,7 @@ def _do_upload_helper( stream, content_type, size, - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2810,7 +2808,7 @@ def test__do_upload_uses_resumable_w_custom_timeout(self): ) def test__do_upload_with_retry(self): - self._do_upload_helper(num_retries=20) + self._do_upload_helper(retry=DEFAULT_RETRY) def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -2846,13 +2844,16 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): expected_timeout = kwargs.get("timeout", self._get_default_timeout()) # Check the mock. - num_retries = kwargs.get("num_retries") + if "retry" in kwargs: + retry = kwargs["retry"] + else: + retry = DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED blob._do_upload.assert_called_once_with( client, stream, content_type, len(data), - num_retries, + retry, predefined_acl, if_generation_match, if_generation_not_match, @@ -2871,10 +2872,16 @@ def test_upload_from_file_success(self): def test_upload_from_file_with_retries(self, mock_warn): from google.cloud.storage import blob as blob_module - self._upload_from_file_helper(num_retries=20) - mock_warn.assert_called_once_with( - blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 - ) + self._upload_from_file_helper(retry=DEFAULT_RETRY) + +# @mock.patch("warnings.warn") +# def test_upload_from_file_with_num_retries(self, mock_warn): +# from google.cloud.storage import blob as blob_module + +# self._upload_from_file_helper(num_retries=2) +# mock_warn.assert_called_once_with( +# blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 +# ) def test_upload_from_file_with_rewind(self): stream = self._upload_from_file_helper(rewind=True) @@ -2912,7 +2919,7 @@ def _do_upload_mock_call_helper( self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) - self.assertIsNone(pos_args[4]) # num_retries + self.assertEqual(pos_args[4], DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) # retry self.assertIsNone(pos_args[5]) # predefined_acl self.assertIsNone(pos_args[6]) # if_generation_match self.assertIsNone(pos_args[7]) # if_generation_not_match From 0d3d260aebd7681821f125d599fe03da3a849064 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 17 May 2021 14:10:52 -0700 Subject: [PATCH 3/8] tests pass with refactor --- google/cloud/storage/_helpers.py | 28 ++++++++++------------- google/cloud/storage/blob.py | 39 ++++++++++++++++---------------- tests/unit/test_blob.py | 37 ++++++++++++++++++------------ 3 files changed, 54 insertions(+), 50 deletions(-) diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 911eeb5d2..6f29a3791 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -571,7 +571,7 @@ def _bucket_bound_hostname_url(host, scheme=None): return "{scheme}://{host}/".format(scheme=scheme, host=host) -def _api_core_retry_to_resumable_media_retry(retry): +def _api_core_retry_to_resumable_media_retry(retry, num_retries=None): """Convert google.api.core.Retry to google.resumable_media.RetryStrategy. Custom predicates are not translated. @@ -579,27 +579,23 @@ def _api_core_retry_to_resumable_media_retry(retry): :type retry: google.api_core.Retry :param retry: (Optional) The google.api_core.Retry object to translate. + :type num_retries: int + :param num_retries: (Optional) The number of retries desired. This is + supported for backwards compatibility and is mutually exclusive with + `retry`. + :rtype: google.resumable_media.RetryStrategy :returns: A RetryStrategy with all applicable attributes copied from input, or a RetryStrategy with max_retries set to 0 if None was input. """ - if retry is not None: + if retry is not None and num_retries is not None: + raise ValueError("num_retries and retry arguments are mutually exclusive") + + elif retry is not None: return resumable_media.RetryStrategy(max_sleep=retry._maximum, max_cumulative_retry=retry._deadline, initial_delay=retry._initial, multiplier=retry._multiplier) + elif num_retries is not None: + return resumable_media.RetryStrategy(max_retries=num_retries) else: return resumable_media.RetryStrategy(max_retries=0) -def _retry_from_num_retries(num_retries): - """Convert num_retries into a Retry object. - - Retry objects have deadlines but not a maximum number of retries. This - function chooses a deadline that will approximate the requested number of - retries. num_retries is deprecated and this approximates previous behavior - on a best-effort basis. - - :type num_retries: int - :param num_retries: The number of retries desired. - """ - - deadline = DEFAULT_RETRY.initial * ((DEFAULT_RETRY.multiplier ** (num_retries+1)) - DEFAULT_RETRY.multiplier) - return DEFAULT_RETRY.with_deadline(deadline) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 1ca51040d..1af8469a5 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -66,7 +66,6 @@ from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud.storage._helpers import _raise_if_more_than_one_set from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry -from google.cloud.storage._helpers import _retry_from_num_retries from google.cloud.storage._signing import generate_signed_url_v2 from google.cloud.storage._signing import generate_signed_url_v4 from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE @@ -1605,7 +1604,7 @@ def _do_multipart_upload( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -1613,6 +1612,7 @@ def _do_multipart_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a multipart upload. @@ -1752,7 +1752,7 @@ def _do_multipart_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry, num_retries) response = upload.transmit( transport, data, object_metadata, content_type, timeout=timeout @@ -1766,7 +1766,7 @@ def _initiate_resumable_upload( stream, content_type, size, - retry, + num_retries, predefined_acl=None, extra_headers=None, chunk_size=None, @@ -1776,6 +1776,7 @@ def _initiate_resumable_upload( if_metageneration_not_match=None, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Initiate a resumable upload. @@ -1933,7 +1934,7 @@ def _initiate_resumable_upload( upload_url, chunk_size, headers=headers, checksum=checksum ) - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry, num_retries) upload.initiate( transport, @@ -1953,7 +1954,7 @@ def _do_resumable_upload( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -1961,6 +1962,7 @@ def _do_resumable_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None, ): """Perform a resumable upload. @@ -2079,7 +2081,7 @@ def _do_upload( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2087,6 +2089,7 @@ def _do_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, + retry=None ): """Determine an upload strategy and then perform the upload. @@ -2195,7 +2198,7 @@ def _do_upload( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2203,6 +2206,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) else: response = self._do_resumable_upload( @@ -2210,7 +2214,7 @@ def _do_upload( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2218,6 +2222,7 @@ def _do_upload( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) return response.json() @@ -2364,16 +2369,11 @@ def upload_from_file( """ if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) - # Convert num_retries into a Retry object. Retry objects don't have - # a maximum number of retries, just a deadline in seconds, so we - # attempt to convert num_retries into a deadline sufficient to do - # that number of retries and no more. - if retry is not None: - raise ValueError("num_retries and retry arguments are mutually exclusive") - elif num_retries < 1: + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: retry = None - else: - retry = _retry_from_num_retries(num_retries) _maybe_rewind(file_obj, rewind=rewind) predefined_acl = ACL.validate_predefined(predefined_acl) @@ -2384,7 +2384,7 @@ def upload_from_file( file_obj, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2392,6 +2392,7 @@ def upload_from_file( if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index f08c677c5..89fce3a5e 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -2191,7 +2191,7 @@ def _initiate_resumable_helper( size=None, extra_headers=None, chunk_size=None, - retry=None, + num_retries=None, user_project=None, predefined_acl=None, if_generation_match=None, @@ -2203,6 +2203,7 @@ def _initiate_resumable_helper( timeout=None, metadata=None, mtls=False, + retry=None, ): from six.moves.urllib.parse import urlencode from google.resumable_media.requests import ResumableUpload @@ -2265,7 +2266,7 @@ def _initiate_resumable_helper( stream, content_type, size, - retry, + num_retries, extra_headers=extra_headers, chunk_size=chunk_size, predefined_acl=predefined_acl, @@ -2273,6 +2274,7 @@ def _initiate_resumable_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2340,6 +2342,8 @@ def _initiate_resumable_helper( retry_strategy = upload._retry_strategy if retry is None: self.assertEqual(retry_strategy.max_retries, 0) + elif num_retries is not None: + self.assertEqual(retry_strategy.max_retries, num_retries) else: self.assertEqual(retry_strategy.max_sleep, 60.0) self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) @@ -2565,7 +2569,7 @@ def _do_resumable_upload_call2( def _do_resumable_helper( self, use_size=False, - retry=None, + num_retries=None, predefined_acl=None, if_generation_match=None, if_generation_not_match=None, @@ -2573,6 +2577,7 @@ def _do_resumable_helper( if_metageneration_not_match=None, timeout=None, data_corruption=False, + retry=None, ): bucket = _Bucket(name="yesterday") blob = self._make_one(u"blob-name", bucket=bucket) @@ -2694,7 +2699,7 @@ def test__do_resumable_upload_with_data_corruption(self): def _do_upload_helper( self, chunk_size=None, - retry=None, + num_retries=None, predefined_acl=None, if_generation_match=None, if_generation_not_match=None, @@ -2702,6 +2707,7 @@ def _do_upload_helper( if_metageneration_not_match=None, size=None, timeout=None, + retry=None, ): from google.cloud.storage.blob import _MAX_MULTIPART_SIZE @@ -2739,12 +2745,13 @@ def _do_upload_helper( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2756,7 +2763,7 @@ def _do_upload_helper( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2764,6 +2771,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) blob._do_resumable_upload.assert_not_called() else: @@ -2773,7 +2781,7 @@ def _do_upload_helper( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2781,6 +2789,7 @@ def _do_upload_helper( if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry, ) def test__do_upload_uses_multipart(self): @@ -2832,6 +2841,8 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_generation_not_match = kwargs.get("if_generation_not_match", None) if_metageneration_match = kwargs.get("if_metageneration_match", None) if_metageneration_not_match = kwargs.get("if_metageneration_not_match", None) + num_retries = kwargs.get("num_retries", None) + retry = kwargs.get("retry", DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) ret_val = blob.upload_from_file( stream, size=len(data), content_type=content_type, client=client, **kwargs ) @@ -2843,17 +2854,12 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): expected_timeout = kwargs.get("timeout", self._get_default_timeout()) - # Check the mock. - if "retry" in kwargs: - retry = kwargs["retry"] - else: - retry = DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED blob._do_upload.assert_called_once_with( client, stream, content_type, len(data), - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, @@ -2861,6 +2867,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_not_match, timeout=expected_timeout, checksum=None, + retry=retry ) return stream @@ -2919,7 +2926,7 @@ def _do_upload_mock_call_helper( self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) - self.assertEqual(pos_args[4], DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) # retry + self.assertEqual(pos_args[4], None) # num_retries self.assertIsNone(pos_args[5]) # predefined_acl self.assertIsNone(pos_args[6]) # if_generation_match self.assertIsNone(pos_args[7]) # if_generation_not_match @@ -2927,7 +2934,7 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None}) + self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None, "retry": DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED}) return pos_args[1] From 265d3bc1482ed9d8329e86bcb5019b4009e1ef08 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Tue, 18 May 2021 13:05:06 -0700 Subject: [PATCH 4/8] version bump for dependencies; tests pass for real now --- google/cloud/storage/fileio.py | 19 ++--- setup.py | 2 +- tests/unit/test_blob.py | 30 ++++--- tests/unit/test_fileio.py | 144 +++++++++++++++++++++++++++++---- 4 files changed, 157 insertions(+), 38 deletions(-) diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index d2d57126a..2829ad293 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -13,6 +13,7 @@ # limitations under the License. import io +import warnings from google.api_core.exceptions import RequestRangeNotSatisfiable from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE @@ -276,20 +277,13 @@ def _initiate_upload(self): retry = self._retry content_type = self._upload_kwargs.pop("content_type", None) - # Handle num_retries backwards-compatibility. if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2) - # Convert num_retries into a Retry object. Retry objects don't have - # a maximum number of retries, just a deadline in seconds, so we - # attempt to convert num_retries into a deadline sufficient to do - # that number of retries and no more. - if retry is not None: - raise ValueError("num_retries and retry arguments are mutually exclusive") - elif num_retries < 1: + # num_retries and retry are mutually exclusive. If num_retries is + # set and retry is exactly the default, then nullify retry for + # backwards compatibility. + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: retry = None - else: - deadline = DEFAULT_RETRY.initial * ((DEFAULT_RETRY.multiplier ** (num_retries+1)) - DEFAULT_RETRY.multiplier) - retry = DEFAULT_RETRY.with_deadline(deadline) # Handle ConditionalRetryPolicy. if isinstance(retry, ConditionalRetryPolicy): @@ -305,8 +299,9 @@ def _initiate_upload(self): self._buffer, content_type, None, - retry, + num_retries, chunk_size=self._chunk_size, + retry=retry, **self._upload_kwargs ) diff --git a/setup.py b/setup.py index e1974b607..4cbeea633 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ dependencies = [ "google-auth >= 1.11.0, < 2.0dev", "google-cloud-core >= 1.4.1, < 2.0dev", - "google-resumable-media >= 1.2.0, < 2.0dev", + "google-resumable-media >= 1.3.0, < 2.0dev", "requests >= 2.18.0, < 3.0.0dev", ] extras = {} diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 89fce3a5e..e001a0a7e 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -2340,10 +2340,11 @@ def _initiate_resumable_helper( self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) retry_strategy = upload._retry_strategy - if retry is None: - self.assertEqual(retry_strategy.max_retries, 0) - elif num_retries is not None: + self.assertFalse(num_retries is not None and retry is not None) + if num_retries is not None and retry is None: self.assertEqual(retry_strategy.max_retries, num_retries) + elif retry is None: + self.assertEqual(retry_strategy.max_retries, 0) else: self.assertEqual(retry_strategy.max_sleep, 60.0) self.assertEqual(retry_strategy.max_cumulative_retry, 120.0) @@ -2426,6 +2427,9 @@ def test__initiate_resumable_upload_with_extra_headers(self): def test__initiate_resumable_upload_with_retry(self): self._initiate_resumable_helper(retry=DEFAULT_RETRY) + def test__initiate_resumable_upload_with_num_retries(self): + self._initiate_resumable_helper(num_retries=11) + def test__initiate_resumable_upload_with_generation_match(self): self._initiate_resumable_helper( if_generation_match=4, if_metageneration_match=4 @@ -2683,6 +2687,9 @@ def test__do_resumable_upload_with_size(self): def test__do_resumable_upload_with_retry(self): self._do_resumable_helper(retry=DEFAULT_RETRY) + def test__do_resumable_upload_with_num_retries(self): + self._do_resumable_helper(num_retries=8) + def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") @@ -2842,7 +2849,8 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_match = kwargs.get("if_metageneration_match", None) if_metageneration_not_match = kwargs.get("if_metageneration_not_match", None) num_retries = kwargs.get("num_retries", None) - retry = kwargs.get("retry", DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) + default_retry = DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + retry = kwargs.get("retry", default_retry) ret_val = blob.upload_from_file( stream, size=len(data), content_type=content_type, client=client, **kwargs ) @@ -2881,14 +2889,14 @@ def test_upload_from_file_with_retries(self, mock_warn): self._upload_from_file_helper(retry=DEFAULT_RETRY) -# @mock.patch("warnings.warn") -# def test_upload_from_file_with_num_retries(self, mock_warn): -# from google.cloud.storage import blob as blob_module + @mock.patch("warnings.warn") + def test_upload_from_file_with_num_retries(self, mock_warn): + from google.cloud.storage import blob as blob_module -# self._upload_from_file_helper(num_retries=2) -# mock_warn.assert_called_once_with( -# blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 -# ) + self._upload_from_file_helper(num_retries=2) + mock_warn.assert_called_once_with( + blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 + ) def test_upload_from_file_with_rewind(self): stream = self._upload_from_file_helper(rewind=True) diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 0ac16ab24..3ddb83273 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -21,6 +21,7 @@ from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer from google.api_core.exceptions import RequestRangeNotSatisfiable +from google.cloud.storage.retry import DEFAULT_RETRY TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" TEST_BINARY_DATA = TEST_TEXT_DATA.encode("utf-8") @@ -37,7 +38,12 @@ def test_attributes(self): self.assertTrue(reader.seekable()) self.assertTrue(reader.readable()) self.assertFalse(reader.writable()) - self.assertEqual(256, reader._chunk_size) + self.assertEqual(reader._chunk_size, 256) + self.assertEqual(reader._retry, DEFAULT_RETRY) + + reader = BlobReader(blob, chunk_size=1024, retry=None) + self.assertEqual(reader._chunk_size, 1024) + self.assertIsNone(reader._retry) def test_read(self): blob = mock.Mock() @@ -52,7 +58,7 @@ def read_from_fake_data(start=0, end=None, **_): # Read and trigger the first download of chunk_size. self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) blob.download_as_bytes.assert_called_once_with( - start=0, end=8, checksum=None, **download_kwargs + start=0, end=8, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read from buffered data only. @@ -64,7 +70,7 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 12) self.assertEqual(blob.download_as_bytes.call_count, 2) blob.download_as_bytes.assert_called_with( - start=8, end=16, checksum=None, **download_kwargs + start=8, end=16, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read a larger amount, requiring a download larger than chunk_size. @@ -72,14 +78,14 @@ def read_from_fake_data(start=0, end=None, **_): self.assertEqual(reader._pos, 28) self.assertEqual(blob.download_as_bytes.call_count, 3) blob.download_as_bytes.assert_called_with( - start=16, end=28, checksum=None, **download_kwargs + start=16, end=28, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) # Read all remaining data. self.assertEqual(reader.read(), TEST_BINARY_DATA[28:]) self.assertEqual(blob.download_as_bytes.call_count, 4) blob.download_as_bytes.assert_called_with( - start=28, end=None, checksum=None, **download_kwargs + start=28, end=None, checksum=None, retry=DEFAULT_RETRY, **download_kwargs ) reader.close() @@ -104,12 +110,12 @@ def read_from_fake_data(start=0, end=None, **_): # Read a line. With chunk_size=10, expect three chunks downloaded. self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) - blob.download_as_bytes.assert_called_with(start=20, end=30, checksum=None) + blob.download_as_bytes.assert_called_with(start=20, end=30, checksum=None, retry=DEFAULT_RETRY) self.assertEqual(blob.download_as_bytes.call_count, 3) # Read another line. self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) - blob.download_as_bytes.assert_called_with(start=50, end=60, checksum=None) + blob.download_as_bytes.assert_called_with(start=50, end=60, checksum=None, retry=DEFAULT_RETRY) self.assertEqual(blob.download_as_bytes.call_count, 6) blob.size = len(TEST_BINARY_DATA) @@ -118,7 +124,7 @@ def read_from_fake_data(start=0, end=None, **_): # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) blob.download_as_bytes.assert_called_with( - start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10, checksum=None + start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10, checksum=None, retry=DEFAULT_RETRY ) self.assertEqual(blob.download_as_bytes.call_count, 13) @@ -209,7 +215,11 @@ def test_attributes(self): self.assertFalse(writer.seekable()) self.assertFalse(writer.readable()) self.assertTrue(writer.writable()) - self.assertEqual(256 * 1024, writer._chunk_size) + self.assertEqual(writer._chunk_size, 256 * 1024) + + writer = BlobWriter(blob, chunk_size=512 * 1024, retry=DEFAULT_RETRY) + self.assertEqual(writer._chunk_size, 512 * 1024) + self.assertEqual(writer._retry, DEFAULT_RETRY) def test_reject_wrong_chunk_size(self): blob = mock.Mock() @@ -261,6 +271,7 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, **upload_kwargs ) upload.transmit_next_chunk.assert_called_with(transport) @@ -286,7 +297,7 @@ def test_seek_fails(self): with self.assertRaises(io.UnsupportedOperation): writer.seek() - def test_conditional_retries(self): + def test_conditional_retry_failure(self): blob = mock.Mock() upload = mock.Mock() @@ -302,7 +313,6 @@ def test_conditional_retries(self): writer = BlobWriter( blob, chunk_size=chunk_size, - num_retries=None, content_type=PLAIN_CONTENT_TYPE, ) @@ -319,15 +329,120 @@ def test_conditional_retries(self): # Write over chunk_size. This should result in upload initialization # and multiple chunks uploaded. - # Due to the condition not being fulfilled, num_retries should be 0. + # Due to the condition not being fulfilled, retry should be None. writer.write(TEST_BINARY_DATA[4:32]) blob._initiate_resumable_upload.assert_called_once_with( blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, - 0, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=None, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_conditional_retry_pass(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + if_metageneration_match=1, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + # Due to the condition being fulfilled, retry should be DEFAULT_RETRY. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + if_metageneration_match=1, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_num_retries_only(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries chunk_size=chunk_size, + retry=None, ) upload.transmit_next_chunk.assert_called_with(transport) self.assertEqual(upload.transmit_next_chunk.call_count, 4) @@ -606,5 +721,6 @@ def test_write(self): None, NUM_RETRIES, chunk_size=chunk_size, + retry=None, ) upload.transmit_next_chunk.assert_called_with(transport) From a8205385106018aeb177ab42ea8e4f3de8481715 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 20 May 2021 10:22:51 -0700 Subject: [PATCH 5/8] complete tests for download retries --- google/cloud/storage/blob.py | 11 ++-- google/cloud/storage/client.py | 1 + tests/unit/test__helpers.py | 29 ++++++++++ tests/unit/test_blob.py | 96 +++++++++++++++++++++++++++++----- tests/unit/test_client.py | 25 +++++++-- tests/unit/test_fileio.py | 7 ++- 6 files changed, 148 insertions(+), 21 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 1af8469a5..18d9f5b4c 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -887,7 +887,7 @@ def _do_download( raw_download=False, timeout=_DEFAULT_TIMEOUT, checksum="md5", - retry=DEFAULT_RETRY, + retry=None, ): """Perform a download without any error handling. @@ -954,6 +954,8 @@ def _do_download( to configure them. """ + retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + if self.chunk_size is None: if raw_download: klass = RawDownload @@ -968,7 +970,7 @@ def _do_download( end=end, checksum=checksum, ) - download._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + download._retry_strategy = retry_strategy response = download.consume(transport, timeout=timeout) self._extract_headers_from_download(response) else: @@ -991,7 +993,7 @@ def _do_download( end=end, ) - download._retry_strategy = _api_core_retry_to_resumable_media_retry(retry) + download._retry_strategy = retry_strategy while not download.finished: download.consume_next_chunk(transport, timeout=timeout) @@ -2055,7 +2057,7 @@ def _do_resumable_upload( stream, content_type, size, - retry, + num_retries, predefined_acl=predefined_acl, if_generation_match=if_generation_match, if_generation_not_match=if_generation_not_match, @@ -2063,6 +2065,7 @@ def _do_resumable_upload( if_metageneration_not_match=if_metageneration_not_match, timeout=timeout, checksum=checksum, + retry=retry, ) while not upload.finished: diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index d20489134..ec7803f44 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -738,6 +738,7 @@ def download_blob_to_file( raw_download, timeout=timeout, checksum=checksum, + retry=retry, ) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index fa989f96e..f7278df08 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -556,6 +556,35 @@ def test_hostname_and_scheme(self): self.assertEqual(self._call_fut(host=HOST, scheme=SCHEME), EXPECTED_URL) +class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase): + def test_conflict(self): + from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + + with self.assertRaises(ValueError): + _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2) + + def test_retry(self): + from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY) + self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum) + self.assertEqual(retry_strategy.max_cumulative_retry, DEFAULT_RETRY._deadline) + self.assertEqual(retry_strategy.initial_delay, DEFAULT_RETRY._initial) + self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier) + + def test_num_retries(self): + from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None, num_retries=2) + self.assertEqual(retry_strategy.max_retries, 2) + + def test_none(self): + from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + + retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None) + self.assertEqual(retry_strategy.max_retries, 0) + + class _Connection(object): def __init__(self, *responses): self._responses = responses diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index e001a0a7e..247ceaaff 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -996,7 +996,7 @@ def _mock_requests_response(status_code, headers, content=b""): response.request = requests.Request("POST", "http://example.com").prepare() return response - def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): + def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" client = mock.Mock() bucket = _Bucket(client) @@ -1020,6 +1020,8 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + with patch as patched: if w_range: blob._do_download( @@ -1030,7 +1032,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): start=1, end=3, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) else: blob._do_download( @@ -1039,7 +1041,7 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): download_url, headers, raw_download=raw_download, - **timeout_kwarg + **extra_kwargs ) if w_range: @@ -1065,6 +1067,13 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None): transport, timeout=expected_timeout ) + retry_strategy = patched.return_value._retry_strategy + retry = extra_kwargs.get("retry", None) + if retry is None: + self.assertEqual(retry_strategy.max_retries, 0) + else: + self.assertEqual(retry_strategy.max_sleep, retry._maximum) + def test__do_download_wo_chunks_wo_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=False, raw_download=False) @@ -1225,6 +1234,7 @@ def test_download_to_file_with_failure(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_wo_media_link(self): @@ -1255,6 +1265,7 @@ def test_download_to_file_wo_media_link(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_file_w_generation_match(self): @@ -1284,9 +1295,10 @@ def test_download_to_file_w_generation_match(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY ) - def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): + def _download_to_file_helper(self, use_chunks, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1305,12 +1317,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): expected_timeout = timeout timeout_kwarg = {"timeout": timeout} + extra_kwargs.update(timeout_kwarg) + file_obj = io.BytesIO() if raw_download: - blob.download_to_file(file_obj, raw_download=True, **timeout_kwarg) + blob.download_to_file(file_obj, raw_download=True, **extra_kwargs) else: - blob.download_to_file(file_obj, **timeout_kwarg) + blob.download_to_file(file_obj, **extra_kwargs) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( client._http, @@ -1322,11 +1337,15 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None): raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry ) def test_download_to_file_wo_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=False, raw_download=False) + def test_download_to_file_wo_chunks_no_retry(self): + self._download_to_file_helper(use_chunks=False, raw_download=False, retry=None) + def test_download_to_file_w_chunks_wo_raw(self): self._download_to_file_helper(use_chunks=True, raw_download=False) @@ -1341,7 +1360,7 @@ def test_download_to_file_w_custom_timeout(self): use_chunks=False, raw_download=False, timeout=9.58 ) - def _download_to_filename_helper(self, updated, raw_download, timeout=None): + def _download_to_filename_helper(self, updated, raw_download, timeout=None, **extra_kwargs): import os from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud._testing import _NamedTemporaryFile @@ -1359,10 +1378,10 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): with _NamedTemporaryFile() as temp: if timeout is None: - blob.download_to_filename(temp.name, raw_download=raw_download) + blob.download_to_filename(temp.name, raw_download=raw_download, **extra_kwargs) else: blob.download_to_filename( - temp.name, raw_download=raw_download, timeout=timeout, + temp.name, raw_download=raw_download, timeout=timeout, **extra_kwargs ) if updated is None: @@ -1377,6 +1396,8 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): expected_timeout = self._get_default_timeout() if timeout is None else timeout + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( client._http, @@ -1388,6 +1409,7 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None): raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) @@ -1420,12 +1442,17 @@ def test_download_to_filename_w_generation_match(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) def test_download_to_filename_w_updated_wo_raw(self): updated = "2014-12-06T13:13:50.690Z" self._download_to_filename_helper(updated=updated, raw_download=False) + def test_download_to_filename_w_updated_no_retry(self): + updated = "2014-12-06T13:13:50.690Z" + self._download_to_filename_helper(updated=updated, raw_download=False, retry=None) + def test_download_to_filename_wo_updated_wo_raw(self): self._download_to_filename_helper(updated=None, raw_download=False) @@ -1478,6 +1505,7 @@ def test_download_to_filename_corrupted(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, filename) @@ -1513,11 +1541,12 @@ def test_download_to_filename_w_key(self): False, timeout=self._get_default_timeout(), checksum="md5", + retry=DEFAULT_RETRY, ) stream = blob._do_download.mock_calls[0].args[1] self.assertEqual(stream.name, temp.name) - def _download_as_bytes_helper(self, raw_download, timeout=None): + def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1528,12 +1557,14 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): if timeout is None: expected_timeout = self._get_default_timeout() - fetched = blob.download_as_bytes(raw_download=raw_download) + fetched = blob.download_as_bytes(raw_download=raw_download, **extra_kwargs) else: expected_timeout = timeout - fetched = blob.download_as_bytes(raw_download=raw_download, timeout=timeout) + fetched = blob.download_as_bytes(raw_download=raw_download, timeout=timeout, **extra_kwargs) self.assertEqual(fetched, b"") + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( client._http, @@ -1545,6 +1576,7 @@ def _download_as_bytes_helper(self, raw_download, timeout=None): raw_download, timeout=expected_timeout, checksum="md5", + retry=expected_retry, ) stream = blob._do_download.mock_calls[0].args[1] self.assertIsInstance(stream, io.BytesIO) @@ -1673,6 +1705,9 @@ def test_download_as_bytes_w_generation_match(self): def test_download_as_bytes_wo_raw(self): self._download_as_bytes_helper(raw_download=False) + def test_download_as_bytes_no_retry(self): + self._download_as_bytes_helper(raw_download=False, retry=None) + def test_download_as_bytes_w_raw(self): self._download_as_bytes_helper(raw_download=True) @@ -1695,6 +1730,7 @@ def _download_as_text_helper( no_charset=False, expected_value=u"DEADBEEF", payload=None, + **extra_kwargs, ): if payload is None: if encoding is not None: @@ -1745,10 +1781,14 @@ def _download_as_text_helper( else: kwargs["timeout"] = expected_timeout = timeout + kwargs.update(extra_kwargs) + fetched = blob.download_as_text(**kwargs) self.assertEqual(fetched, expected_value) + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + blob.download_as_bytes.assert_called_once_with( client=client, start=start, @@ -1759,12 +1799,15 @@ def _download_as_text_helper( if_generation_not_match=if_generation_not_match, if_metageneration_match=if_metageneration_match, if_metageneration_not_match=if_metageneration_not_match, - retry=DEFAULT_RETRY, + retry=expected_retry, ) def test_download_as_text_wo_raw(self): self._download_as_text_helper(raw_download=False) + def test_download_as_text_w_no_retry(self): + self._download_as_text_helper(raw_download=False, retry=None) + def test_download_as_text_w_raw(self): self._download_as_text_helper(raw_download=True) @@ -1862,6 +1905,33 @@ def test_download_as_string(self, mock_warn): stacklevel=1, ) + def test_download_as_string_no_retry(self): + MEDIA_LINK = "http://example.com/media/" + + client = self._make_client() + blob = self._make_one( + "blob-name", bucket=_Bucket(client), properties={"mediaLink": MEDIA_LINK} + ) + client.download_blob_to_file = mock.Mock() + + fetched = blob.download_as_string(retry=None) + self.assertEqual(fetched, b"") + + client.download_blob_to_file.assert_called_once_with( + blob, + mock.ANY, + start=None, + end=None, + raw_download=False, + if_generation_match=None, + if_generation_not_match=None, + if_metageneration_match=None, + if_metageneration_not_match=None, + timeout=self._get_default_timeout(), + checksum="md5", + retry=None, + ) + def test__get_content_type_explicit(self): blob = self._make_one(u"blob-name", bucket=None) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index df780c786..0a2bd231f 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -29,6 +29,7 @@ from . import _read_local_json from google.cloud.storage.retry import DEFAULT_RETRY +from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED _SERVICE_ACCOUNT_JSON = _read_local_json("url_signer_v4_test_account.json") @@ -1130,6 +1131,7 @@ def test_download_blob_to_file_with_failure(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_uri(self): @@ -1160,6 +1162,7 @@ def test_download_blob_to_file_with_uri(self): False, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=DEFAULT_RETRY, ) def test_download_blob_to_file_with_invalid_uri(self): @@ -1171,7 +1174,7 @@ def test_download_blob_to_file_with_invalid_uri(self): with pytest.raises(ValueError, match="URI scheme must be gs"): client.download_blob_to_file("http://bucket_name/path/to/object", file_obj) - def _download_blob_to_file_helper(self, use_chunks, raw_download): + def _download_blob_to_file_helper(self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs): from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -1187,9 +1190,15 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): file_obj = io.BytesIO() if raw_download: - client.download_blob_to_file(blob, file_obj, raw_download=True) + client.download_blob_to_file(blob, file_obj, raw_download=True, **extra_kwargs) else: - client.download_blob_to_file(blob, file_obj) + client.download_blob_to_file(blob, file_obj, **extra_kwargs) + + expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) + if expected_retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED and not expect_condition_fail: + expected_retry = DEFAULT_RETRY + elif expect_condition_fail: + expected_retry = None headers = {"accept-encoding": "gzip"} blob._do_download.assert_called_once_with( @@ -1202,6 +1211,7 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download): raw_download, checksum="md5", timeout=_DEFAULT_TIMEOUT, + retry=expected_retry, ) def test_download_blob_to_file_wo_chunks_wo_raw(self): @@ -1216,6 +1226,15 @@ def test_download_blob_to_file_wo_chunks_w_raw(self): def test_download_blob_to_file_w_chunks_w_raw(self): self._download_blob_to_file_helper(use_chunks=True, raw_download=True) + def test_download_blob_to_file_w_no_retry(self): + self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=None) + + def test_download_blob_to_file_w_conditional_retry_pass(self): + self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, if_generation_match=1) + + def test_download_blob_to_file_w_conditional_retry_fail(self): + self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, expect_condition_fail=True) + def test_list_blobs(self): from google.cloud.storage.bucket import Bucket diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 3ddb83273..5831067f5 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -19,6 +19,7 @@ import io import string +from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer from google.api_core.exceptions import RequestRangeNotSatisfiable from google.cloud.storage.retry import DEFAULT_RETRY @@ -401,7 +402,8 @@ def test_conditional_retry_pass(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) - def test_num_retries_only(self): + @mock.patch("warnings.warn") + def test_num_retries_only(self, mock_warn): blob = mock.Mock() upload = mock.Mock() @@ -446,6 +448,9 @@ def test_num_retries_only(self): ) upload.transmit_next_chunk.assert_called_with(transport) self.assertEqual(upload.transmit_next_chunk.call_count, 4) + mock_warn.assert_called_once_with( + _NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 + ) # Write another byte, finalize and close. writer.write(TEST_BINARY_DATA[32:33]) From 20afb8edb9c0e87b533a4798defdd5a0b4d05a57 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 20 May 2021 16:39:41 -0700 Subject: [PATCH 6/8] coverage and test --- google/cloud/storage/_helpers.py | 9 +- google/cloud/storage/blob.py | 19 ++- google/cloud/storage/client.py | 5 +- google/cloud/storage/fileio.py | 20 ++- tests/unit/test__helpers.py | 20 ++- tests/unit/test_blob.py | 205 ++++++++++++++++++++++++++++--- tests/unit/test_client.py | 31 ++++- tests/unit/test_fileio.py | 29 +++-- 8 files changed, 282 insertions(+), 56 deletions(-) diff --git a/google/cloud/storage/_helpers.py b/google/cloud/storage/_helpers.py index 6f29a3791..72ff56ec4 100644 --- a/google/cloud/storage/_helpers.py +++ b/google/cloud/storage/_helpers.py @@ -52,6 +52,7 @@ "object, or None, instead." ) + def _get_storage_host(): return os.environ.get(STORAGE_EMULATOR_ENV_VAR, _DEFAULT_STORAGE_HOST) @@ -593,9 +594,13 @@ def _api_core_retry_to_resumable_media_retry(retry, num_retries=None): raise ValueError("num_retries and retry arguments are mutually exclusive") elif retry is not None: - return resumable_media.RetryStrategy(max_sleep=retry._maximum, max_cumulative_retry=retry._deadline, initial_delay=retry._initial, multiplier=retry._multiplier) + return resumable_media.RetryStrategy( + max_sleep=retry._maximum, + max_cumulative_retry=retry._deadline, + initial_delay=retry._initial, + multiplier=retry._multiplier, + ) elif num_retries is not None: return resumable_media.RetryStrategy(max_retries=num_retries) else: return resumable_media.RetryStrategy(max_retries=0) - diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 18d9f5b4c..f2f8d9184 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -1754,7 +1754,9 @@ def _do_multipart_upload( upload_url = _add_query_parameters(base_url, name_value_pairs) upload = MultipartUpload(upload_url, headers=headers, checksum=checksum) - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry, num_retries) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) response = upload.transmit( transport, data, object_metadata, content_type, timeout=timeout @@ -1936,7 +1938,9 @@ def _initiate_resumable_upload( upload_url, chunk_size, headers=headers, checksum=checksum ) - upload._retry_strategy = _api_core_retry_to_resumable_media_retry(retry, num_retries) + upload._retry_strategy = _api_core_retry_to_resumable_media_retry( + retry, num_retries + ) upload.initiate( transport, @@ -2092,7 +2096,7 @@ def _do_upload( if_metageneration_not_match, timeout=_DEFAULT_TIMEOUT, checksum=None, - retry=None + retry=None, ): """Determine an upload strategy and then perform the upload. @@ -2192,7 +2196,10 @@ def _do_upload( # arguments into query_params dictionaries. Media operations work # differently, so here we make a "fake" query_params to feed to the # ConditionalRetryPolicy. - query_params = {"ifGenerationMatch": if_generation_match, "ifMetagenerationMatch": if_metageneration_match} + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) if size is not None and size <= _MAX_MULTIPART_SIZE: @@ -2395,7 +2402,7 @@ def upload_from_file( if_metageneration_not_match, timeout=timeout, checksum=checksum, - retry=retry + retry=retry, ) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: @@ -3524,7 +3531,7 @@ def open( "retry". For uploads only, the following additional arguments are supported: "content_type", "num_retries", "predefined_acl", "checksum". "num_retries" is supported for backwards-compatibility - reasons only; please use "retry" with a Retry object or + reasons only; please use "retry" with a Retry object or ConditionalRetryPolicy instead. :returns: A 'BlobReader' or 'BlobWriter' from diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index ec7803f44..9e0c04e07 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -711,7 +711,10 @@ def download_blob_to_file( # arguments into query_params dictionaries. Media operations work # differently, so here we make a "fake" query_params to feed to the # ConditionalRetryPolicy. - query_params = {"ifGenerationMatch": if_generation_match, "ifMetagenerationMatch": if_metageneration_match} + query_params = { + "ifGenerationMatch": if_generation_match, + "ifMetagenerationMatch": if_metageneration_match, + } retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) if not isinstance(blob_or_uri, Blob): diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 2829ad293..2a34b4d30 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -66,8 +66,6 @@ class BlobReader(io.BufferedIOBase): bytes than the chunk_size are requested, the remainder is buffered. The default is the chunk_size of the blob, or 40MiB. - - :param download_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", @@ -86,7 +84,7 @@ def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs self._pos = 0 self._buffer = io.BytesIO() self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE - self._retry=retry + self._retry = retry self._download_kwargs = download_kwargs def read(self, size=-1): @@ -216,7 +214,14 @@ class BlobWriter(io.BufferedIOBase): "num_retries", "predefined_acl", "checksum". """ - def __init__(self, blob, chunk_size=None, text_mode=False, retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, **upload_kwargs): + def __init__( + self, + blob, + chunk_size=None, + text_mode=False, + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, + **upload_kwargs + ): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: raise ValueError( @@ -291,7 +296,12 @@ def _initiate_upload(self): # arguments into query_params dictionaries. Media operations work # differently, so here we make a "fake" query_params to feed to the # ConditionalRetryPolicy. - query_params = {"ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), "ifMetagenerationMatch": self._upload_kwargs.get("if_metageneration_match")} + query_params = { + "ifGenerationMatch": self._upload_kwargs.get("if_generation_match"), + "ifMetagenerationMatch": self._upload_kwargs.get( + "if_metageneration_match" + ), + } retry = retry.get_retry_policy_if_conditions_met(query_params=query_params) self._upload_and_transport = self._blob._initiate_resumable_upload( diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index f7278df08..b8b74f96b 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -558,13 +558,17 @@ def test_hostname_and_scheme(self): class Test__api_core_retry_to_resumable_media_retry(unittest.TestCase): def test_conflict(self): - from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) with self.assertRaises(ValueError): _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY, num_retries=2) def test_retry(self): - from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) retry_strategy = _api_core_retry_to_resumable_media_retry(retry=DEFAULT_RETRY) self.assertEqual(retry_strategy.max_sleep, DEFAULT_RETRY._maximum) @@ -573,13 +577,19 @@ def test_retry(self): self.assertEqual(retry_strategy.multiplier, DEFAULT_RETRY._multiplier) def test_num_retries(self): - from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) - retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None, num_retries=2) + retry_strategy = _api_core_retry_to_resumable_media_retry( + retry=None, num_retries=2 + ) self.assertEqual(retry_strategy.max_retries, 2) def test_none(self): - from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry + from google.cloud.storage._helpers import ( + _api_core_retry_to_resumable_media_retry, + ) retry_strategy = _api_core_retry_to_resumable_media_retry(retry=None) self.assertEqual(retry_strategy.max_retries, 0) diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 247ceaaff..8a042545a 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -996,7 +996,9 @@ def _mock_requests_response(status_code, headers, content=b""): response.request = requests.Request("POST", "http://example.com").prepare() return response - def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None, **extra_kwargs): + def _do_download_helper_wo_chunks( + self, w_range, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = mock.Mock() bucket = _Bucket(client) @@ -1077,6 +1079,11 @@ def _do_download_helper_wo_chunks(self, w_range, raw_download, timeout=None, **e def test__do_download_wo_chunks_wo_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=False, raw_download=False) + def test__do_download_wo_chunks_wo_range_wo_raw_w_retry(self): + self._do_download_helper_wo_chunks( + w_range=False, raw_download=False, retry=DEFAULT_RETRY + ) + def test__do_download_wo_chunks_w_range_wo_raw(self): self._do_download_helper_wo_chunks(w_range=True, raw_download=False) @@ -1295,10 +1302,12 @@ def test_download_to_file_w_generation_match(self): False, timeout=self._get_default_timeout(), checksum="md5", - retry=DEFAULT_RETRY + retry=DEFAULT_RETRY, ) - def _download_to_file_helper(self, use_chunks, raw_download, timeout=None, **extra_kwargs): + def _download_to_file_helper( + self, use_chunks, raw_download, timeout=None, **extra_kwargs + ): blob_name = "blob-name" client = self._make_client() bucket = _Bucket(client) @@ -1337,7 +1346,7 @@ def _download_to_file_helper(self, use_chunks, raw_download, timeout=None, **ext raw_download, timeout=expected_timeout, checksum="md5", - retry=expected_retry + retry=expected_retry, ) def test_download_to_file_wo_chunks_wo_raw(self): @@ -1360,7 +1369,9 @@ def test_download_to_file_w_custom_timeout(self): use_chunks=False, raw_download=False, timeout=9.58 ) - def _download_to_filename_helper(self, updated, raw_download, timeout=None, **extra_kwargs): + def _download_to_filename_helper( + self, updated, raw_download, timeout=None, **extra_kwargs + ): import os from google.cloud.storage._helpers import _convert_to_timestamp from google.cloud._testing import _NamedTemporaryFile @@ -1378,10 +1389,15 @@ def _download_to_filename_helper(self, updated, raw_download, timeout=None, **ex with _NamedTemporaryFile() as temp: if timeout is None: - blob.download_to_filename(temp.name, raw_download=raw_download, **extra_kwargs) + blob.download_to_filename( + temp.name, raw_download=raw_download, **extra_kwargs + ) else: blob.download_to_filename( - temp.name, raw_download=raw_download, timeout=timeout, **extra_kwargs + temp.name, + raw_download=raw_download, + timeout=timeout, + **extra_kwargs ) if updated is None: @@ -1451,7 +1467,9 @@ def test_download_to_filename_w_updated_wo_raw(self): def test_download_to_filename_w_updated_no_retry(self): updated = "2014-12-06T13:13:50.690Z" - self._download_to_filename_helper(updated=updated, raw_download=False, retry=None) + self._download_to_filename_helper( + updated=updated, raw_download=False, retry=None + ) def test_download_to_filename_wo_updated_wo_raw(self): self._download_to_filename_helper(updated=None, raw_download=False) @@ -1560,7 +1578,9 @@ def _download_as_bytes_helper(self, raw_download, timeout=None, **extra_kwargs): fetched = blob.download_as_bytes(raw_download=raw_download, **extra_kwargs) else: expected_timeout = timeout - fetched = blob.download_as_bytes(raw_download=raw_download, timeout=timeout, **extra_kwargs) + fetched = blob.download_as_bytes( + raw_download=raw_download, timeout=timeout, **extra_kwargs + ) self.assertEqual(fetched, b"") expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) @@ -1730,7 +1750,7 @@ def _download_as_text_helper( no_charset=False, expected_value=u"DEADBEEF", payload=None, - **extra_kwargs, + **extra_kwargs ): if payload is None: if encoding is not None: @@ -2041,6 +2061,7 @@ def _do_multipart_success( mock_get_boundary, client=None, size=None, + num_retries=None, user_project=None, predefined_acl=None, if_generation_match=None, @@ -2094,12 +2115,13 @@ def _do_multipart_success( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2171,6 +2193,28 @@ def _do_multipart_success( def test__do_multipart_upload_no_size(self, mock_get_boundary): self._do_multipart_success(mock_get_boundary, predefined_acl="private") + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", retry=DEFAULT_RETRY + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_num_retries(self, mock_get_boundary): + self._do_multipart_success( + mock_get_boundary, predefined_acl="private", num_retries=2 + ) + + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_no_size_retry_conflict(self, mock_get_boundary): + with self.assertRaises(ValueError): + self._do_multipart_success( + mock_get_boundary, + predefined_acl="private", + num_retries=2, + retry=DEFAULT_RETRY, + ) + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_no_size_mtls(self, mock_get_boundary): self._do_multipart_success( @@ -2500,6 +2544,10 @@ def test__initiate_resumable_upload_with_retry(self): def test__initiate_resumable_upload_with_num_retries(self): self._initiate_resumable_helper(num_retries=11) + def test__initiate_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._initiate_resumable_helper(retry=DEFAULT_RETRY, num_retries=2) + def test__initiate_resumable_upload_with_generation_match(self): self._initiate_resumable_helper( if_generation_match=4, if_metageneration_match=4 @@ -2692,12 +2740,13 @@ def _do_resumable_helper( stream, content_type, size, - retry, + num_retries, predefined_acl, if_generation_match, if_generation_not_match, if_metageneration_match, if_metageneration_not_match, + retry=retry, **timeout_kwarg ) @@ -2760,6 +2809,10 @@ def test__do_resumable_upload_with_retry(self): def test__do_resumable_upload_with_num_retries(self): self._do_resumable_helper(num_retries=8) + def test__do_resumable_upload_with_retry_conflict(self): + with self.assertRaises(ValueError): + self._do_resumable_helper(num_retries=9, retry=DEFAULT_RETRY) + def test__do_resumable_upload_with_predefined_acl(self): self._do_resumable_helper(predefined_acl="private") @@ -2832,6 +2885,9 @@ def _do_upload_helper( **timeout_kwarg ) + if retry is DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED: + retry = DEFAULT_RETRY if if_metageneration_match else None + self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() if size is not None and size <= _MAX_MULTIPART_SIZE: @@ -2896,6 +2952,17 @@ def test__do_upload_uses_resumable_w_custom_timeout(self): def test__do_upload_with_retry(self): self._do_upload_helper(retry=DEFAULT_RETRY) + def test__do_upload_with_num_retries(self): + self._do_upload_helper(num_retries=2) + + def test__do_upload_with_conditional_retry_success(self): + self._do_upload_helper( + retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED, if_metageneration_match=1 + ) + + def test__do_upload_with_conditional_retry_failure(self): + self._do_upload_helper(retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED) + def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -2919,7 +2986,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_match = kwargs.get("if_metageneration_match", None) if_metageneration_not_match = kwargs.get("if_metageneration_not_match", None) num_retries = kwargs.get("num_retries", None) - default_retry = DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + default_retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) retry = kwargs.get("retry", default_retry) ret_val = blob.upload_from_file( stream, size=len(data), content_type=content_type, client=client, **kwargs @@ -2945,7 +3014,7 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): if_metageneration_not_match, timeout=expected_timeout, checksum=None, - retry=retry + retry=retry, ) return stream @@ -2955,8 +3024,6 @@ def test_upload_from_file_success(self): @mock.patch("warnings.warn") def test_upload_from_file_with_retries(self, mock_warn): - from google.cloud.storage import blob as blob_module - self._upload_from_file_helper(retry=DEFAULT_RETRY) @mock.patch("warnings.warn") @@ -2968,6 +3035,13 @@ def test_upload_from_file_with_num_retries(self, mock_warn): blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2 ) + @mock.patch("warnings.warn") + def test_upload_from_file_with_retry_conflict(self, mock_warn): + # Special case here: in a conflict this method should NOT raise an error + # as that's handled further downstream. It should pass both options + # through. + self._upload_from_file_helper(retry=DEFAULT_RETRY, num_retries=2) + def test_upload_from_file_with_rewind(self): stream = self._upload_from_file_helper(rewind=True) assert stream.tell() == 0 @@ -2994,7 +3068,14 @@ def test_upload_from_file_failure(self): self.assertEqual(exc_info.exception.errors, []) def _do_upload_mock_call_helper( - self, blob, client, content_type, size, timeout=None + self, + blob, + client, + content_type, + size, + timeout=None, + num_retries=None, + retry=None, ): self.assertEqual(blob._do_upload.call_count, 1) mock_call = blob._do_upload.mock_calls[0] @@ -3004,7 +3085,7 @@ def _do_upload_mock_call_helper( self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) - self.assertEqual(pos_args[4], None) # num_retries + self.assertEqual(pos_args[4], num_retries) # num_retries self.assertIsNone(pos_args[5]) # predefined_acl self.assertIsNone(pos_args[6]) # if_generation_match self.assertIsNone(pos_args[7]) # if_generation_not_match @@ -3012,7 +3093,13 @@ def _do_upload_mock_call_helper( self.assertIsNone(pos_args[9]) # if_metageneration_not_match expected_timeout = self._get_default_timeout() if timeout is None else timeout - self.assertEqual(kwargs, {"timeout": expected_timeout, "checksum": None, "retry": DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED}) + if not retry: + retry = ( + DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED if not num_retries else None + ) + self.assertEqual( + kwargs, {"timeout": expected_timeout, "checksum": None, "retry": retry} + ) return pos_args[1] @@ -3047,6 +3134,72 @@ def test_upload_from_filename(self): self.assertEqual(stream.mode, "rb") self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_with_retry(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, retry=DEFAULT_RETRY + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), retry=DEFAULT_RETRY + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + + def test_upload_from_filename_with_num_retries(self): + from google.cloud._testing import _NamedTemporaryFile + + blob = self._make_one("blob-name", bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {"metadata": {"mint": "ice-cream"}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) + + data = b"soooo much data" + content_type = u"image/svg+xml" + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, "wb") as file_obj: + file_obj.write(data) + + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client, num_retries=2 + ) + + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json["metadata"]) + + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data), num_retries=2 + ) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, "rb") + self.assertEqual(stream.name, temp.name) + def test_upload_from_filename_w_custom_timeout(self): from google.cloud._testing import _NamedTemporaryFile @@ -3091,6 +3244,11 @@ def _upload_from_string_helper(self, data, **kwargs): self.assertIsNone(ret_val) self.assertEqual(blob.component_count, 5) + extra_kwargs = {} + if "retry" in kwargs: + extra_kwargs["retry"] = kwargs["retry"] + if "num_retries" in kwargs: + extra_kwargs["num_retries"] = kwargs["num_retries"] # Check the mock. payload = _to_bytes(data, encoding="utf-8") stream = self._do_upload_mock_call_helper( @@ -3099,6 +3257,7 @@ def _upload_from_string_helper(self, data, **kwargs): "text/plain", len(payload), kwargs.get("timeout", self._get_default_timeout()), + **extra_kwargs ) self.assertIsInstance(stream, io.BytesIO) self.assertEqual(stream.getvalue(), payload) @@ -3115,6 +3274,14 @@ def test_upload_from_string_w_text(self): data = u"\N{snowman} \N{sailboat}" self._upload_from_string_helper(data) + def test_upload_from_string_w_text_w_retry(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, retry=DEFAULT_RETRY) + + def test_upload_from_string_w_text_w_num_retries(self): + data = u"\N{snowman} \N{sailboat}" + self._upload_from_string_helper(data, num_retries=2) + def _create_resumable_upload_session_helper( self, origin=None, side_effect=None, timeout=None ): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 0a2bd231f..fb366f28d 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1174,7 +1174,9 @@ def test_download_blob_to_file_with_invalid_uri(self): with pytest.raises(ValueError, match="URI scheme must be gs"): client.download_blob_to_file("http://bucket_name/path/to/object", file_obj) - def _download_blob_to_file_helper(self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs): + def _download_blob_to_file_helper( + self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs + ): from google.cloud.storage.blob import Blob from google.cloud.storage.constants import _DEFAULT_TIMEOUT @@ -1190,12 +1192,17 @@ def _download_blob_to_file_helper(self, use_chunks, raw_download, expect_conditi file_obj = io.BytesIO() if raw_download: - client.download_blob_to_file(blob, file_obj, raw_download=True, **extra_kwargs) + client.download_blob_to_file( + blob, file_obj, raw_download=True, **extra_kwargs + ) else: client.download_blob_to_file(blob, file_obj, **extra_kwargs) expected_retry = extra_kwargs.get("retry", DEFAULT_RETRY) - if expected_retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED and not expect_condition_fail: + if ( + expected_retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED + and not expect_condition_fail + ): expected_retry = DEFAULT_RETRY elif expect_condition_fail: expected_retry = None @@ -1227,13 +1234,25 @@ def test_download_blob_to_file_w_chunks_w_raw(self): self._download_blob_to_file_helper(use_chunks=True, raw_download=True) def test_download_blob_to_file_w_no_retry(self): - self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=None) + self._download_blob_to_file_helper( + use_chunks=True, raw_download=True, retry=None + ) def test_download_blob_to_file_w_conditional_retry_pass(self): - self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, if_generation_match=1) + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + if_generation_match=1, + ) def test_download_blob_to_file_w_conditional_retry_fail(self): - self._download_blob_to_file_helper(use_chunks=True, raw_download=True, retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, expect_condition_fail=True) + self._download_blob_to_file_helper( + use_chunks=True, + raw_download=True, + retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED, + expect_condition_fail=True, + ) def test_list_blobs(self): from google.cloud.storage.bucket import Bucket diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 5831067f5..531fa7249 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -111,12 +111,16 @@ def read_from_fake_data(start=0, end=None, **_): # Read a line. With chunk_size=10, expect three chunks downloaded. self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) - blob.download_as_bytes.assert_called_with(start=20, end=30, checksum=None, retry=DEFAULT_RETRY) + blob.download_as_bytes.assert_called_with( + start=20, end=30, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 3) # Read another line. self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) - blob.download_as_bytes.assert_called_with(start=50, end=60, checksum=None, retry=DEFAULT_RETRY) + blob.download_as_bytes.assert_called_with( + start=50, end=60, checksum=None, retry=DEFAULT_RETRY + ) self.assertEqual(blob.download_as_bytes.call_count, 6) blob.size = len(TEST_BINARY_DATA) @@ -125,7 +129,10 @@ def read_from_fake_data(start=0, end=None, **_): # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) blob.download_as_bytes.assert_called_with( - start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10, checksum=None, retry=DEFAULT_RETRY + start=len(TEST_BINARY_DATA), + end=len(TEST_BINARY_DATA) + 10, + checksum=None, + retry=DEFAULT_RETRY, ) self.assertEqual(blob.download_as_bytes.call_count, 13) @@ -312,9 +319,7 @@ def test_conditional_retry_failure(self): # gives us more control over close() for test purposes. chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. writer = BlobWriter( - blob, - chunk_size=chunk_size, - content_type=PLAIN_CONTENT_TYPE, + blob, chunk_size=chunk_size, content_type=PLAIN_CONTENT_TYPE, ) # The transmit_next_chunk method must actually consume bytes from the @@ -336,8 +341,8 @@ def test_conditional_retry_failure(self): blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, # size - None, # num_retries + None, # size + None, # num_retries chunk_size=chunk_size, retry=None, ) @@ -388,8 +393,8 @@ def test_conditional_retry_pass(self): blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, # size - None, # num_retries + None, # size + None, # num_retries chunk_size=chunk_size, retry=DEFAULT_RETRY, if_metageneration_match=1, @@ -441,8 +446,8 @@ def test_num_retries_only(self, mock_warn): blob.bucket.client, writer._buffer, PLAIN_CONTENT_TYPE, - None, # size - 2, # num_retries + None, # size + 2, # num_retries chunk_size=chunk_size, retry=None, ) From 4b3767272765f3478c26be0b4bc068534bca86bc Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 10 Jun 2021 17:56:51 -0700 Subject: [PATCH 7/8] Finish coverage and add docstrings --- google/cloud/storage/blob.py | 243 +++++++++++++++++++++++++++++++++ google/cloud/storage/client.py | 21 +++ google/cloud/storage/fileio.py | 44 ++++++ tests/unit/test_fileio.py | 38 ++++++ 4 files changed, 346 insertions(+) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index f2f8d9184..6d4b128e9 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -1100,6 +1100,28 @@ def download_to_file( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1198,6 +1220,28 @@ def download_to_filename( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`google.cloud.exceptions.NotFound` """ client = self._require_client(client) @@ -1305,6 +1349,28 @@ def download_as_bytes( emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1394,6 +1460,28 @@ def download_as_string( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: bytes :returns: The data stored in this blob. @@ -1488,6 +1576,28 @@ def download_as_text( Can also be passed as a tuple (connect_timeout, read_timeout). See :meth:`requests.Session.request` documentation for details. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: text :returns: The data stored in this blob, decoded to text. """ @@ -1693,6 +1803,21 @@ def _do_multipart_upload( manually-set checksum value. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -1874,6 +1999,21 @@ def _initiate_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: tuple :returns: Pair of @@ -2052,6 +2192,21 @@ def _do_resumable_upload( delete the uploaded object automatically. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will configure backoff and timeout options. Custom + predicates (customizable error codes) are not supported for media + operations such as this one. + + This private method does not accept ConditionalRetryPolicy values + because the information necessary to evaluate the policy is instead + evaluated in client.download_blob_to_file(). + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. @@ -2184,6 +2339,28 @@ def _do_upload( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -2370,6 +2547,28 @@ def upload_from_file( attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. @@ -2513,6 +2712,28 @@ def upload_from_filename( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ content_type = self._get_content_type(content_type, filename=filename) @@ -2635,6 +2856,28 @@ def upload_from_string( google.resumable_media.common.DataCorruption on a mismatch and attempting to delete the corrupted file. Supported values are "md5", "crc32c" and None. The default is None. + + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. """ data = _to_bytes(data, encoding="utf-8") string_buffer = BytesIO(data) diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 9e0c04e07..e03f593b4 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -679,6 +679,27 @@ def download_blob_to_file( downloads where chunk_size is set) an INFO-level log will be emitted. Supported values are "md5", "crc32c" and None. The default is "md5". + retry (google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy) + (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. Examples: Download a blob using a blob resource. diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 2a34b4d30..e74b9ed4a 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -66,6 +66,28 @@ class BlobReader(io.BufferedIOBase): bytes than the chunk_size are requested, the remainder is buffered. The default is the chunk_size of the blob, or 40MiB. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param download_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", @@ -207,6 +229,28 @@ class BlobWriter(io.BufferedIOBase): changes the behavior of flush() to conform to TextIOWrapper's expectations. + :type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy + :param retry: (Optional) How to retry the RPC. A None value will disable + retries. A google.api_core.retry.Retry value will enable retries, + and the object will define retriable response codes and errors and + configure backoff and timeout options. + + A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a + Retry object and activates it only if certain conditions are met. + This class exists to provide safe defaults for RPC calls that are + not technically safe to retry normally (due to potential data + duplication or other side-effects) but become safe to retry if a + condition such as if_metageneration_match is set. + + See the retry.py source code and docstrings in this package + (google.cloud.storage.retry) for information on retry types and how + to configure them. + + Media operations (downloads and uploads) do not support non-default + predicates in a Retry object. The default will always be used. Other + configuration changes for Retry objects such as delays and deadlines + are respected. + :param upload_kwargs: Keyword arguments to pass to the underlying API calls. The following arguments are supported: "if_generation_match", "if_generation_not_match", "if_metageneration_match", diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 531fa7249..9a8478f15 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -407,6 +407,44 @@ def test_conditional_retry_pass(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_num_retries_and_retry_conflict(self): + blob = mock.Mock() + + blob._initiate_resumable_upload.side_effect = ValueError + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + num_retries=2, + retry=DEFAULT_RETRY, + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. The mock will raise a ValueError, simulating + # actual behavior when num_retries and retry are both specified. + with self.assertRaises(ValueError): + writer.write(TEST_BINARY_DATA[4:32]) + + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + 2, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + ) + @mock.patch("warnings.warn") def test_num_retries_only(self, mock_warn): blob = mock.Mock() From 39f03b1a81e49660994157136babea26a9de71f3 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 11 Jun 2021 10:31:38 -0700 Subject: [PATCH 8/8] address tseaver comments --- google/cloud/storage/client.py | 1 + tests/unit/test_client.py | 95 +--------------------------------- tests/unit/test_fileio.py | 76 +++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 93 deletions(-) diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index 5bbe72ad0..df42f0c11 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -1260,6 +1260,7 @@ def list_blobs( extra_params=extra_params, page_start=_blobs_page_start, timeout=timeout, + retry=retry, ) iterator.bucket = bucket iterator.prefixes = set() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 0773126b8..33ec331d6 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -1467,99 +1467,6 @@ def test_download_blob_to_file_w_conditional_retry_fail(self): expect_condition_fail=True, ) - def test_list_blobs(self): - from google.cloud.storage.bucket import Bucket - - BUCKET_NAME = "bucket-name" - - credentials = _make_credentials() - client = self._make_one(project="PROJECT", credentials=credentials) - connection = _make_connection({"items": []}) - - with mock.patch( - "google.cloud.storage.client.Client._connection", - new_callable=mock.PropertyMock, - ) as client_mock: - client_mock.return_value = connection - - bucket_obj = Bucket(client, BUCKET_NAME) - iterator = client.list_blobs(bucket_obj) - blobs = list(iterator) - - self.assertEqual(blobs, []) - connection.api_request.assert_called_once_with( - method="GET", - path="/b/%s/o" % BUCKET_NAME, - query_params={"projection": "noAcl"}, - timeout=self._get_default_timeout(), - retry=DEFAULT_RETRY, - ) - - def test_list_blobs_w_all_arguments_and_user_project(self): - from google.cloud.storage.bucket import Bucket - - BUCKET_NAME = "name" - USER_PROJECT = "user-project-123" - MAX_RESULTS = 10 - PAGE_TOKEN = "ABCD" - PREFIX = "subfolder" - DELIMITER = "/" - START_OFFSET = "c" - END_OFFSET = "g" - INCLUDE_TRAILING_DELIMITER = True - VERSIONS = True - PROJECTION = "full" - FIELDS = "items/contentLanguage,nextPageToken" - EXPECTED = { - "maxResults": 10, - "pageToken": PAGE_TOKEN, - "prefix": PREFIX, - "delimiter": DELIMITER, - "startOffset": START_OFFSET, - "endOffset": END_OFFSET, - "includeTrailingDelimiter": INCLUDE_TRAILING_DELIMITER, - "versions": VERSIONS, - "projection": PROJECTION, - "fields": FIELDS, - "userProject": USER_PROJECT, - } - - credentials = _make_credentials() - client = self._make_one(project=USER_PROJECT, credentials=credentials) - connection = _make_connection({"items": []}) - - with mock.patch( - "google.cloud.storage.client.Client._connection", - new_callable=mock.PropertyMock, - ) as client_mock: - client_mock.return_value = connection - - bucket = Bucket(client, BUCKET_NAME, user_project=USER_PROJECT) - iterator = client.list_blobs( - bucket_or_name=bucket, - max_results=MAX_RESULTS, - page_token=PAGE_TOKEN, - prefix=PREFIX, - delimiter=DELIMITER, - start_offset=START_OFFSET, - end_offset=END_OFFSET, - include_trailing_delimiter=INCLUDE_TRAILING_DELIMITER, - versions=VERSIONS, - projection=PROJECTION, - fields=FIELDS, - timeout=42, - ) - blobs = list(iterator) - - self.assertEqual(blobs, []) - connection.api_request.assert_called_once_with( - method="GET", - path="/b/%s/o" % BUCKET_NAME, - query_params=EXPECTED, - timeout=42, - retry=DEFAULT_RETRY, - ) - def _download_blob_to_file_helper( self, use_chunks, raw_download, expect_condition_fail=False, **extra_kwargs ): @@ -1652,6 +1559,7 @@ def test_list_blobs_w_defaults_w_bucket_obj(self): extra_params=expected_extra_params, page_start=expected_page_start, timeout=self._get_default_timeout(), + retry=DEFAULT_RETRY, ) def test_list_blobs_w_explicit_w_user_project(self): @@ -1727,6 +1635,7 @@ def test_list_blobs_w_explicit_w_user_project(self): extra_params=expected_extra_params, page_start=expected_page_start, timeout=timeout, + retry=retry, ) def test_list_buckets_wo_project(self): diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 9a8478f15..6ce9b4990 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -42,6 +42,9 @@ def test_attributes(self): self.assertEqual(reader._chunk_size, 256) self.assertEqual(reader._retry, DEFAULT_RETRY) + def test_attributes_explict(self): + blob = mock.Mock() + blob.chunk_size = 256 reader = BlobReader(blob, chunk_size=1024, retry=None) self.assertEqual(reader._chunk_size, 1024) self.assertIsNone(reader._retry) @@ -91,6 +94,24 @@ def read_from_fake_data(start=0, end=None, **_): reader.close() + def test_retry_passed_through(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, retry=None, **download_kwargs) + + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) + blob.download_as_bytes.assert_called_once_with( + start=0, end=8, checksum=None, retry=None, **download_kwargs + ) + + reader.close() + def test_416_error_handled(self): blob = mock.Mock() blob.download_as_bytes = mock.Mock( @@ -225,6 +246,9 @@ def test_attributes(self): self.assertTrue(writer.writable()) self.assertEqual(writer._chunk_size, 256 * 1024) + def test_attributes_explicit(self): + blob = mock.Mock() + blob.chunk_size = 256 * 1024 writer = BlobWriter(blob, chunk_size=512 * 1024, retry=DEFAULT_RETRY) self.assertEqual(writer._chunk_size, 512 * 1024) self.assertEqual(writer._retry, DEFAULT_RETRY) @@ -407,6 +431,58 @@ def test_conditional_retry_pass(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) + @mock.patch("warnings.warn") + def test_forced_default_retry(self, mock_warn): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + content_type=PLAIN_CONTENT_TYPE, + retry=DEFAULT_RETRY, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, # size + None, # num_retries + chunk_size=chunk_size, + retry=DEFAULT_RETRY, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + def test_num_retries_and_retry_conflict(self): blob = mock.Mock()